You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2013/07/26 10:57:54 UTC

[1/2] [SSHD-244] Abstract the IO layer and provide two providers: mina and plain nio2

Updated Branches:
  refs/heads/master 728903aea -> bb2eb2b5f


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaConnector.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaConnector.java b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaConnector.java
new file mode 100644
index 0000000..406fb9d
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaConnector.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io.mina;
+
+import java.net.SocketAddress;
+
+import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.future.IoFutureListener;
+import org.apache.mina.core.service.IoConnector;
+import org.apache.mina.core.service.IoHandler;
+import org.apache.mina.core.session.IoSessionConfig;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.future.DefaultSshFuture;
+import org.apache.sshd.common.io.IoConnectFuture;
+
+/**
+ */
+public class MinaConnector extends MinaService implements org.apache.sshd.common.io.IoConnector, IoHandler {
+
+    protected volatile IoConnector connector;
+    protected IoSessionConfig sessionConfig;
+
+    public MinaConnector(FactoryManager manager, org.apache.sshd.common.io.IoHandler handler) {
+        super(manager, handler);
+    }
+
+
+    protected IoConnector createConnector() {
+        NioSocketConnector connector = new NioSocketConnector(getNioWorkers());
+        if (sessionConfig != null) {
+            connector.getSessionConfig().setAll(sessionConfig);
+        }
+        return connector;
+    }
+
+    protected IoConnector getConnector() {
+        if (connector == null) {
+            synchronized (this) {
+                if (connector == null) {
+                    connector = createConnector();
+                    connector.setHandler(this);
+                }
+            }
+        }
+        return connector;
+    }
+
+    @Override
+    protected org.apache.mina.core.service.IoService getIoService() {
+        return getConnector();
+    }
+
+    public IoConnectFuture connect(SocketAddress address) {
+        class Future extends DefaultSshFuture<IoConnectFuture> implements IoConnectFuture {
+            Future(Object lock) {
+                super(lock);
+            }
+
+            public org.apache.sshd.common.io.IoSession getSession() {
+                Object v = getValue();
+                return v instanceof org.apache.sshd.common.io.IoSession ? (org.apache.sshd.common.io.IoSession) v : null;
+            }
+
+            public Throwable getException() {
+                Object v = getValue();
+                return v instanceof Throwable ? (Throwable) v : null;
+            }
+
+            public boolean isConnected() {
+                return getValue() instanceof org.apache.sshd.common.io.IoSession;
+            }
+
+            public void setSession(org.apache.sshd.common.io.IoSession session) {
+                setValue(session);
+            }
+
+            public void setException(Throwable exception) {
+                setValue(exception);
+            }
+        }
+        final IoConnectFuture future = new Future(null);
+        getConnector().connect(address).addListener(new IoFutureListener<ConnectFuture>() {
+            public void operationComplete(ConnectFuture cf) {
+                if (cf.getException() != null) {
+                    future.setException(cf.getException());
+                } else if (cf.isCanceled()) {
+                    future.cancel();
+                } else {
+                    future.setSession(getSession(cf.getSession()));
+                }
+            }
+        });
+        return future;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaService.java b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaService.java
new file mode 100644
index 0000000..065d253
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaService.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io.mina;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.service.IoHandler;
+import org.apache.mina.core.service.IoService;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.sshd.common.FactoryManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public abstract class MinaService implements org.apache.sshd.common.io.IoService, IoHandler {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    protected final FactoryManager manager;
+    protected final org.apache.sshd.common.io.IoHandler handler;
+
+    public MinaService(FactoryManager manager, org.apache.sshd.common.io.IoHandler handler) {
+        this.manager = manager;
+        this.handler = handler;
+    }
+
+    public int getNioWorkers() {
+        String nioWorkers = manager.getProperties().get(FactoryManager.NIO_WORKERS);
+        if (nioWorkers != null && nioWorkers.length() > 0) {
+            int nb = Integer.parseInt(nioWorkers);
+            if (nb > 0) {
+                return nb;
+            }
+        }
+        return FactoryManager.DEFAULT_NIO_WORKERS;
+    }
+
+    protected abstract IoService getIoService();
+
+    public void dispose() {
+        getIoService().dispose();
+    }
+
+    public Map<Long, org.apache.sshd.common.io.IoSession> getManagedSessions() {
+        Map<Long, IoSession> mina = new HashMap<Long, IoSession>(getIoService().getManagedSessions());
+        Map<Long, org.apache.sshd.common.io.IoSession> sessions = new HashMap<Long, org.apache.sshd.common.io.IoSession>();
+        for (Long id : mina.keySet()) {
+            sessions.put(id, getSession(mina.get(id)));
+        }
+        return sessions;
+    }
+
+    public void sessionCreated(IoSession session) throws Exception {
+        org.apache.sshd.common.io.IoSession ioSession = new MinaSession(this, session);
+        session.setAttribute(org.apache.sshd.common.io.IoSession.class, ioSession);
+        handler.sessionCreated(ioSession);
+    }
+
+    public void sessionOpened(IoSession session) throws Exception {
+    }
+
+    public void sessionClosed(IoSession session) throws Exception {
+        handler.sessionClosed(getSession(session));
+    }
+
+    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
+    }
+
+    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+        handler.exceptionCaught(getSession(session), cause);
+    }
+
+    public void messageReceived(IoSession session, Object message) throws Exception {
+        handler.messageReceived(getSession(session), MinaSupport.asReadable((IoBuffer) message));
+    }
+
+    public void messageSent(IoSession session, Object message) throws Exception {
+    }
+
+    protected org.apache.sshd.common.io.IoSession getSession(IoSession session) {
+        return (org.apache.sshd.common.io.IoSession)
+                session.getAttribute(org.apache.sshd.common.io.IoSession.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaServiceFactory.java
new file mode 100644
index 0000000..2eeb8e7
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaServiceFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io.mina;
+
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.io.IoAcceptor;
+import org.apache.sshd.common.io.IoConnector;
+import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoServiceFactory;
+
+/**
+ */
+public class MinaServiceFactory implements IoServiceFactory {
+
+    public IoConnector createConnector(FactoryManager manager, IoHandler handler) {
+        return new MinaConnector(manager, handler);
+    }
+
+    public IoAcceptor createAcceptor(FactoryManager manager, IoHandler handler) {
+        return new MinaAcceptor(manager, handler);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
new file mode 100644
index 0000000..0ebf626
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io.mina;
+
+import java.net.SocketAddress;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.future.CloseFuture;
+import org.apache.mina.core.future.IoFutureListener;
+import org.apache.mina.core.future.WriteFuture;
+import org.apache.sshd.common.future.DefaultSshFuture;
+import org.apache.sshd.common.io.IoCloseFuture;
+import org.apache.sshd.common.io.IoService;
+import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.Buffer;
+
+/**
+ */
+public class MinaSession implements IoSession {
+
+    private final MinaService service;
+    private final org.apache.mina.core.session.IoSession session;
+
+    public MinaSession(MinaService service, org.apache.mina.core.session.IoSession session) {
+        this.service = service;
+        this.session = session;
+    }
+
+    public org.apache.mina.core.session.IoSession getSession() {
+        return session;
+    }
+
+    public void suspend() {
+        session.suspendRead();
+        session.suspendWrite();
+    }
+
+    public Object getAttribute(Object key) {
+        return session.getAttribute(key);
+    }
+
+    public Object setAttribute(Object key, Object value) {
+        return session.setAttribute(key, value);
+    }
+
+    public SocketAddress getRemoteAddress() {
+        return session.getRemoteAddress();
+    }
+
+    public SocketAddress getLocalAddress() {
+        return session.getLocalAddress();
+    }
+
+    public long getId() {
+        return session.getId();
+    }
+
+    public WriteFuture write(byte[] data, int offset, int len) {
+        IoBuffer buffer = IoBuffer.wrap(data, offset, len);
+        return session.write(buffer);
+    }
+
+    public IoCloseFuture close(boolean immediately) {
+        class Future extends DefaultSshFuture<IoCloseFuture> implements IoCloseFuture {
+            Future(Object lock) {
+                super(lock);
+            }
+
+            public boolean isClosed() {
+                return getValue() instanceof Boolean;
+            }
+
+            public void setClosed() {
+                setValue(Boolean.TRUE);
+            }
+        }
+        final IoCloseFuture future = new Future(null);
+        session.close(immediately).addListener(new IoFutureListener<CloseFuture>() {
+            public void operationComplete(CloseFuture cf) {
+                future.setClosed();
+            }
+        });
+        return future;
+    }
+
+    public IoWriteFuture write(Buffer buffer) {
+        class Future extends DefaultSshFuture<IoWriteFuture> implements IoWriteFuture {
+            Future(Object lock) {
+                super(lock);
+            }
+
+            public boolean isWritten() {
+                return getValue() instanceof Boolean;
+            }
+
+            public void setWritten() {
+                setValue(Boolean.TRUE);
+            }
+
+            public Throwable getException() {
+                Object v = getValue();
+                return v instanceof Throwable ? (Throwable) v : null;
+            }
+
+            public void setException(Throwable exception) {
+                if (exception == null) {
+                    throw new IllegalArgumentException("exception");
+                }
+                setValue(exception);
+            }
+        }
+        final IoWriteFuture future = new Future(null);
+        session.write(MinaSupport.asIoBuffer(buffer)).addListener(new IoFutureListener<WriteFuture>() {
+            public void operationComplete(WriteFuture cf) {
+                if (cf.getException() != null) {
+                    future.setException(cf.getException());
+                } else {
+                    future.setWritten();
+                }
+            }
+        });
+        return future;
+    }
+
+    public IoService getService() {
+        return service;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSupport.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSupport.java b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSupport.java
new file mode 100644
index 0000000..f9259f4
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSupport.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io.mina;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.common.util.Readable;
+
+public class MinaSupport {
+
+    public static Readable asReadable(final IoBuffer buffer) {
+        return new Readable() {
+            public int available() {
+                return buffer.remaining();
+            }
+
+            public void getRawBytes(byte[] data, int offset, int len) {
+                buffer.get(data, offset, len);
+            }
+        };
+    }
+
+    public static IoBuffer asIoBuffer(Buffer buffer) {
+        return IoBuffer.wrap(buffer.array(), buffer.rpos(), buffer.available());
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
new file mode 100644
index 0000000..7afdf6c
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io.nio2;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.channels.AsynchronousServerSocketChannel;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.io.IoAcceptor;
+import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoSession;
+
+/**
+ */
+public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
+
+    private final Map<SocketAddress, AsynchronousServerSocketChannel> channels;
+    private final Map<SocketAddress, AsynchronousServerSocketChannel> unbound;
+    private int backlog = 50;
+
+    public Nio2Acceptor(FactoryManager manager, IoHandler handler) {
+        super(manager, handler);
+        channels = new ConcurrentHashMap<SocketAddress, AsynchronousServerSocketChannel>();
+        unbound = new ConcurrentHashMap<SocketAddress, AsynchronousServerSocketChannel>();
+    }
+
+    public void bind(Collection<? extends SocketAddress> addresses) throws IOException {
+        for (SocketAddress address : addresses) {
+            logger.debug("Binding Nio2Acceptor to address {}", address);
+            AsynchronousServerSocketChannel socket = AsynchronousServerSocketChannel.open(group);
+            socket.setOption(StandardSocketOptions.SO_REUSEADDR, Boolean.TRUE);
+            socket.bind(address, backlog);
+            SocketAddress local = socket.getLocalAddress();
+            channels.put(local, socket);
+            socket.accept(local, new AcceptCompletionHandler(socket));
+        }
+    }
+
+    public void bind(SocketAddress address) throws IOException {
+        bind(Collections.singleton(address));
+    }
+
+    public void unbind() {
+        logger.debug("Unbinding");
+        unbind(getBoundAddresses());
+    }
+
+    public void unbind(Collection<? extends SocketAddress> addresses) {
+        for (SocketAddress address : addresses) {
+            AsynchronousServerSocketChannel channel = channels.remove(address);
+            if (channel != null) {
+                unbound.put(address, channel);
+            }
+        }
+    }
+
+    public void unbind(SocketAddress address) {
+        unbind(Collections.singleton(address));
+    }
+
+    public Set<SocketAddress> getBoundAddresses() {
+        return new HashSet<SocketAddress>(channels.keySet());
+    }
+
+    public void doDispose() {
+        unbind();
+        for (IoSession session : sessions.values()) {
+            session.close(true);
+        }
+        for (SocketAddress address : channels.keySet()) {
+            try {
+                channels.get(address).close();
+            } catch (IOException e) {
+                logger.debug("Exception caught while closing channel", e);
+            }
+        }
+        try {
+            group.shutdownNow();
+            group.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            logger.debug("Exception caught while closing channel group", e);
+        }
+        try {
+            executor.shutdownNow();
+            executor.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            logger.debug("Exception caught while closing executor", e);
+        }
+    }
+
+    class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, SocketAddress> {
+        private final AsynchronousServerSocketChannel socket;
+        AcceptCompletionHandler(AsynchronousServerSocketChannel socket) {
+            this.socket = socket;
+        }
+        public void completed(AsynchronousSocketChannel result, SocketAddress address) {
+            // Verify that the address has not been unbound
+            if (!channels.containsKey(address)) {
+                try {
+                    result.close();
+                } catch (IOException e) {
+                    logger.debug("Ignoring error closing accepted connection on unbound socket", e);
+                }
+                acceptorStopped(address);
+                return;
+            }
+            try {
+                // Create a session
+                Nio2Session session = new Nio2Session(Nio2Acceptor.this, handler, result);
+                handler.sessionCreated(session);
+                sessions.put(session.getId(), session);
+                session.startReading();
+                // Accept new connections
+                socket.accept(address, this);
+            } catch (Throwable exc) {
+                failed(exc, address);
+            }
+        }
+        public void failed(Throwable exc, SocketAddress address) {
+            if (!channels.containsKey(address)) {
+                acceptorStopped(address);
+            } else if (!disposing.get()) {
+                logger.warn("Caught exception while accepting incoming connection", exc);
+            }
+        }
+        protected void acceptorStopped(SocketAddress address) {
+            // TODO: check remaining sessions on that address
+            // TODO: and eventually close the server socket
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
new file mode 100644
index 0000000..602aac1
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io.nio2;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
+
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.future.DefaultSshFuture;
+import org.apache.sshd.common.io.IoConnectFuture;
+import org.apache.sshd.common.io.IoConnector;
+import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoSession;
+
+/**
+ */
+public class Nio2Connector extends Nio2Service implements IoConnector {
+
+    public Nio2Connector(FactoryManager manager, IoHandler handler) {
+        super(manager, handler);
+    }
+
+    public IoConnectFuture connect(SocketAddress address) {
+        logger.debug("Connecting to {}", address);
+        final IoConnectFuture future = new DefaultIoConnectFuture(null);
+        try {
+            final AsynchronousSocketChannel socket = AsynchronousSocketChannel.open(group);
+            socket.connect(address, null, new CompletionHandler<Void, Object>() {
+                public void completed(Void result, Object attachment) {
+                    try {
+                        Nio2Session session = new Nio2Session(Nio2Connector.this, handler, socket);
+                        handler.sessionCreated(session);
+                        sessions.put(session.getId(), session);
+                        future.setSession(session);
+                        session.startReading();
+                    } catch (Throwable e) {
+                        try {
+                            socket.close();
+                        } catch (IOException t) {
+                            // Ignore
+                        }
+                        future.setException(e);
+                    }
+                }
+                public void failed(Throwable exc, Object attachment) {
+                    future.setException(exc);
+                }
+            });
+        } catch (IOException exc) {
+            future.setException(exc);
+        }
+        return future;
+    }
+
+    static class DefaultIoConnectFuture extends DefaultSshFuture<IoConnectFuture> implements IoConnectFuture {
+        DefaultIoConnectFuture(Object lock) {
+            super(lock);
+        }
+        public IoSession getSession() {
+            Object v = getValue();
+            return v instanceof IoSession ? (IoSession) v : null;
+        }
+        public Throwable getException() {
+            Object v = getValue();
+            return v instanceof Throwable ? (Throwable) v : null;
+        }
+        public boolean isConnected() {
+            return getValue() instanceof IoSession;
+        }
+        public void setSession(IoSession session) {
+            setValue(session);
+        }
+        public void setException(Throwable exception) {
+            setValue(exception);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
new file mode 100644
index 0000000..64cbc51
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io.nio2;
+
+import java.io.IOException;
+import java.nio.channels.AsynchronousChannelGroup;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.RuntimeSshException;
+import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoService;
+import org.apache.sshd.common.io.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public abstract class Nio2Service implements IoService {
+
+    protected final Logger logger = LoggerFactory.getLogger(getClass());
+    protected final FactoryManager manager;
+    protected final IoHandler handler;
+    protected final ExecutorService executor;
+    protected final AsynchronousChannelGroup group;
+    protected final Map<Long, IoSession> sessions;
+    protected final AtomicBoolean disposing = new AtomicBoolean();
+
+    protected Nio2Service(FactoryManager manager, IoHandler handler) {
+        logger.debug("Creating {}", getClass().getSimpleName());
+        try {
+            this.manager = manager;
+            this.handler = handler;
+            executor = Executors.newFixedThreadPool(getNioWorkers());
+            group = AsynchronousChannelGroup.withThreadPool(executor);
+            sessions = new ConcurrentHashMap<Long, IoSession>();
+        } catch (IOException e) {
+            throw new RuntimeSshException(e);
+        }
+    }
+
+    public int getNioWorkers() {
+        String nioWorkers = manager.getProperties().get(FactoryManager.NIO_WORKERS);
+        if (nioWorkers != null && nioWorkers.length() > 0) {
+            int nb = Integer.parseInt(nioWorkers);
+            if (nb > 0) {
+                return nb;
+            }
+        }
+        return FactoryManager.DEFAULT_NIO_WORKERS;
+    }
+
+    public void dispose() {
+        if (disposing.compareAndSet(false, true)) {
+            logger.debug("Disposing {}", getClass().getSimpleName());
+            doDispose();
+        }
+    }
+
+    protected void doDispose() {
+        for (IoSession session : sessions.values()) {
+            session.close(true);
+        }
+        group.shutdown();
+    }
+
+    public Map<Long, IoSession> getManagedSessions() {
+        return Collections.unmodifiableMap(sessions);
+    }
+
+    public void sessionClosed(Nio2Session session) {
+        sessions.remove(session.getId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java
new file mode 100644
index 0000000..447b631
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io.nio2;
+
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.io.IoAcceptor;
+import org.apache.sshd.common.io.IoConnector;
+import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoServiceFactory;
+
+/**
+ */
+public class Nio2ServiceFactory implements IoServiceFactory {
+
+    public IoConnector createConnector(FactoryManager manager, IoHandler handler) {
+        return new Nio2Connector(manager, handler);
+    }
+
+    public IoAcceptor createAcceptor(FactoryManager manager, IoHandler handler) {
+        return new Nio2Acceptor(manager, handler);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
new file mode 100644
index 0000000..f49ef2a
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io.nio2;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.CompletionHandler;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.common.future.DefaultSshFuture;
+import org.apache.sshd.common.io.IoCloseFuture;
+import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoService;
+import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.common.util.Readable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class Nio2Session implements IoSession {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(Nio2Session.class);
+    private static final AtomicLong sessionIdGenerator = new AtomicLong(100);
+
+    private final long id = sessionIdGenerator.incrementAndGet();
+    private final Nio2Service service;
+    private final IoHandler handler;
+    private final AsynchronousSocketChannel socket;
+    private final Map<Object, Object> attributes = new HashMap<Object, Object>();
+    private final SocketAddress localAddress;
+    private final SocketAddress remoteAddress;
+
+    private final AtomicBoolean closing = new AtomicBoolean();
+    private final IoCloseFuture closeFuture = new DefaultIoCloseFuture(null);
+    private final Queue<DefaultIoWriteFuture> writes = new LinkedTransferQueue<DefaultIoWriteFuture>();
+    private final AtomicReference<DefaultIoWriteFuture> currentWrite = new AtomicReference<DefaultIoWriteFuture>();
+
+    public Nio2Session(Nio2Service service, IoHandler handler, AsynchronousSocketChannel socket) throws IOException {
+        this.service = service;
+        this.handler = handler;
+        this.socket = socket;
+        this.localAddress = socket.getLocalAddress();
+        this.remoteAddress = socket.getRemoteAddress();
+        LOGGER.debug("Creating Nio2Session on {} from {}", localAddress, remoteAddress);
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public Object getAttribute(Object key) {
+        return attributes.get(key);
+    }
+
+    public Object setAttribute(Object key, Object value) {
+        return attributes.put(key, value);
+    }
+
+    public SocketAddress getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    public SocketAddress getLocalAddress() {
+        return localAddress;
+    }
+
+    public void suspend() {
+        try {
+            this.socket.shutdownInput();
+        } catch (IOException e) {
+            // Ignore
+        }
+        try {
+            this.socket.shutdownOutput();
+        } catch (IOException e) {
+            // Ignore
+        }
+    }
+
+    public IoWriteFuture write(Buffer buffer) {
+        LOGGER.debug("Writing {} bytes", buffer.available());
+        ByteBuffer buf = ByteBuffer.wrap(buffer.array(), buffer.rpos(), buffer.available());
+        final DefaultIoWriteFuture future = new DefaultIoWriteFuture(null, buf);
+        if (closing.get()) {
+            Throwable exc = new ClosedChannelException();
+            future.setException(exc);
+            exceptionCaught(exc);
+            return future;
+        }
+        writes.add(future);
+        startWriting();
+        return future;
+    }
+
+    private void exceptionCaught(Throwable exc) {
+        if (!closing.get()) {
+            if (!socket.isOpen()) {
+                close(true);
+            } else {
+                try {
+                    LOGGER.debug("Caught exception, now calling handler");
+                    handler.exceptionCaught(this, exc);
+                } catch (Throwable t) {
+                    LOGGER.info("Exception handler threw exception, closing the session", t);
+                    close(true);
+                }
+            }
+        }
+    }
+
+    private void startWriting() {
+        final DefaultIoWriteFuture future = writes.peek();
+        if (future != null) {
+            if (currentWrite.compareAndSet(null, future)) {
+                socket.write(future.buffer, null, new CompletionHandler<Integer, Object>() {
+                    public void completed(Integer result, Object attachment) {
+                        future.setWritten();
+                        finishWrite();
+                    }
+                    public void failed(Throwable exc, Object attachment) {
+                        future.setException(exc);
+                        exceptionCaught(exc);
+                        finishWrite();
+                    }
+                    private void finishWrite() {
+                        synchronized (writes) {
+                            writes.remove(future);
+                            writes.notifyAll();
+                        }
+                        currentWrite.compareAndSet(future, null);
+                        startWriting();
+                    }
+                });
+            }
+        }
+    }
+
+    public IoCloseFuture close(boolean immediately) {
+        if (closing.compareAndSet(false, true)) {
+            LOGGER.debug("Closing Nio2Session");
+            if (!immediately) {
+                try {
+                    boolean logged = false;
+                    synchronized (writes) {
+                        while (!writes.isEmpty()) {
+                            if (!logged) {
+                                LOGGER.debug("Waiting for writes to finish");
+                                logged = true;
+                            }
+                            writes.wait();
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    // Wait has been interrupted, just close the socket
+                }
+            }
+            for (;;) {
+                DefaultIoWriteFuture future = writes.poll();
+                if (future != null) {
+                    future.setException(new ClosedChannelException());
+                } else {
+                    break;
+                }
+            }
+            try {
+                LOGGER.debug("Closing socket");
+                socket.close();
+            } catch (IOException e) {
+                LOGGER.info("Exception caught while closing session", e);
+            }
+            service.sessionClosed(this);
+            closeFuture.setClosed();
+            try {
+                handler.sessionClosed(this);
+            } catch (Exception e) {
+                // Ignore
+                LOGGER.debug("Exception caught while calling IoHandler#sessionClosed", e);
+            }
+        }
+        return closeFuture;
+    }
+
+    public IoService getService() {
+        return service;
+    }
+
+    public void startReading() {
+        final ByteBuffer buffer = ByteBuffer.allocate(32 * 1024);
+        socket.read(buffer, null, new CompletionHandler<Integer, Object>() {
+            public void completed(Integer result, Object attachment) {
+                try {
+                    if (result >= 0) {
+                        LOGGER.debug("Read {} bytes", result);
+                        buffer.flip();
+                        Readable buf = new Readable() {
+                            public int available() {
+                                return buffer.remaining();
+                            }
+                            public void getRawBytes(byte[] data, int offset, int len) {
+                                buffer.get(data, offset, len);
+                            }
+                        };
+                        handler.messageReceived(Nio2Session.this, buf);
+                        startReading();
+                    } else {
+                        LOGGER.debug("Socket has been disconnected, closing IoSession now");
+                        Nio2Session.this.close(true);
+                    }
+                } catch (Throwable exc) {
+                    failed(exc, attachment);
+                }
+            }
+            public void failed(Throwable exc, Object attachment) {
+                exceptionCaught(exc);
+            }
+        });
+    }
+
+    static class DefaultIoCloseFuture extends DefaultSshFuture<IoCloseFuture> implements IoCloseFuture {
+        DefaultIoCloseFuture(Object lock) {
+            super(lock);
+        }
+        public boolean isClosed() {
+            return getValue() instanceof Boolean;
+        }
+        public void setClosed() {
+            setValue(Boolean.TRUE);
+        }
+    }
+
+    static class DefaultIoWriteFuture extends DefaultSshFuture<IoWriteFuture> implements IoWriteFuture {
+        private final ByteBuffer buffer;
+        DefaultIoWriteFuture(Object lock, ByteBuffer buffer) {
+            super(lock);
+            this.buffer = buffer;
+        }
+        public boolean isWritten() {
+            return getValue() instanceof Boolean;
+        }
+        public void setWritten() {
+            setValue(Boolean.TRUE);
+        }
+        public Throwable getException() {
+            Object v = getValue();
+            return v instanceof Throwable ? (Throwable) v : null;
+        }
+        public void setException(Throwable exception) {
+            if (exception == null) {
+                throw new IllegalArgumentException("exception");
+            }
+            setValue(exception);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index f86e787..e855689 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -27,11 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.future.IoFuture;
-import org.apache.mina.core.future.IoFutureListener;
-import org.apache.mina.core.future.WriteFuture;
-import org.apache.mina.core.session.IoSession;
 import org.apache.sshd.client.channel.AbstractClientChannel;
 import org.apache.sshd.common.Channel;
 import org.apache.sshd.common.Cipher;
@@ -51,8 +46,12 @@ import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.future.SshFuture;
 import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoCloseFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.common.util.BufferUtils;
+import org.apache.sshd.common.util.Readable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -239,7 +238,7 @@ public abstract class AbstractSession implements Session {
      * @param buffer the new buffer received
      * @throws Exception if an error occurs while decoding or handling the data
      */
-    public void messageReceived(IoBuffer buffer) throws Exception {
+    public void messageReceived(Readable buffer) throws Exception {
         synchronized (decodeLock) {
             decoderBuffer.putBuffer(buffer);
             // One of those property will be set by the constructor and the other
@@ -307,8 +306,8 @@ public abstract class AbstractSession implements Session {
      */
     public CloseFuture close(final boolean immediately) {
 	    final AbstractSession s = this;
-        class IoSessionCloser implements IoFutureListener {
-            public void operationComplete(IoFuture future) {
+        class IoSessionCloser implements SshFutureListener<IoCloseFuture> {
+            public void operationComplete(IoCloseFuture future) {
                 synchronized (lock) {
                     log.debug("IoSession closed");
                     closeFuture.setClosed();
@@ -364,14 +363,13 @@ public abstract class AbstractSession implements Session {
      * @return a future that can be used to check when the packet has actually been sent
      * @throws java.io.IOException if an error occured when encoding sending the packet
      */
-    public WriteFuture writePacket(Buffer buffer) throws IOException {
+    public IoWriteFuture writePacket(Buffer buffer) throws IOException {
         // Synchronize all write requests as needed by the encoding algorithm
         // and also queue the write request in this synchronized block to ensure
         // packets are sent in the correct order
         synchronized (encodeLock) {
             encode(buffer);
-            IoBuffer bb = IoBuffer.wrap(buffer.array(), buffer.rpos(), buffer.available());
-            return ioSession.write(bb);
+            return ioSession.write(buffer);
         }
     }
 
@@ -606,11 +604,8 @@ public abstract class AbstractSession implements Session {
      * @param ident our identification to send
      */
     protected void sendIdentification(String ident) {
-        IoBuffer buffer = IoBuffer.allocate(32);
-        buffer.setAutoExpand(true);
-        buffer.put((ident + "\r\n").getBytes());
-        buffer.flip();
-        ioSession.write(buffer);
+        byte[] data = (ident + "\r\n").getBytes();
+        ioSession.write(new Buffer(data));
     }
 
     /**
@@ -907,9 +902,8 @@ public abstract class AbstractSession implements Session {
         buffer.putInt(reason);
         buffer.putString(msg);
         buffer.putString("");
-        WriteFuture f = writePacket(buffer);
-        f.addListener(new IoFutureListener() {
-            public void operationComplete(IoFuture future) {
+        writePacket(buffer).addListener(new SshFutureListener<IoWriteFuture>() {
+            public void operationComplete(IoWriteFuture future) {
                 close(true);
             }
         });

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
index 9ecf340..8c8429c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
@@ -18,13 +18,12 @@
  */
 package org.apache.sshd.common.session;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import org.apache.mina.core.session.IoSession;
 import org.apache.sshd.common.AbstractSessionIoHandler;
 import org.apache.sshd.common.SessionListener;
+import org.apache.sshd.common.io.IoSession;
 
 /**
  * An abstract base factory of sessions.

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java b/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java
index 80dfe17..d82f980 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java
@@ -35,7 +35,6 @@ import java.security.spec.InvalidKeySpecException;
 import java.security.spec.RSAPrivateCrtKeySpec;
 import java.security.spec.RSAPublicKeySpec;
 
-import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.sshd.common.KeyPairProvider;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
@@ -45,7 +44,7 @@ import org.apache.sshd.common.SshException;
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public final class Buffer {
+public final class Buffer implements Readable {
 
     public static final int DEFAULT_SIZE = 256;
     public static final int MAX_LEN = 65536;
@@ -329,17 +328,10 @@ public final class Buffer {
         data[wpos++] = b;
     }
 
-    public void putBuffer(Buffer buffer) {
+    public void putBuffer(Readable buffer) {
         int r = buffer.available();
         ensureCapacity(r);
-        System.arraycopy(buffer.data, buffer.rpos, data, wpos, r);
-        wpos += r;
-    }
-
-    public void putBuffer(IoBuffer buffer) {
-        int r = buffer.remaining();
-        ensureCapacity(r);
-        buffer.get(data, wpos, r);
+        buffer.getRawBytes(data, wpos, r);
         wpos += r;
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/util/Readable.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/Readable.java b/sshd-core/src/main/java/org/apache/sshd/common/util/Readable.java
new file mode 100644
index 0000000..4420e19
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/Readable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.util;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface Readable {
+
+    int available();
+
+    void getRawBytes(byte[] data, int offset, int len);
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
index d47e10c..49bac66 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
@@ -22,9 +22,7 @@ import java.util.List;
 
 import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.FactoryManager;
-import org.apache.sshd.common.ForwardingAcceptorFactory;
 import org.apache.sshd.common.NamedFactory;
-import org.apache.sshd.common.file.FileSystemFactory;
 import org.apache.sshd.server.auth.gss.GSSAuthenticator;
 
 /**
@@ -151,12 +149,4 @@ public interface ServerFactoryManager extends FactoryManager {
      */
     List<NamedFactory<Command>> getSubsystemFactories();
 
-    /**
-     * Retrieve the IoAcceptor factory to be used to accept incoming connections
-     * for X11 Forwards.
-     * 
-     * @return A <code>ForwardNioAcceptorFactory</code>
-     */
-    ForwardingAcceptorFactory getX11ForwardingAcceptorFactory();
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
index 35ab197..6791be5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
@@ -28,8 +28,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.mina.core.future.WriteFuture;
-import org.apache.mina.core.session.IoSession;
 import org.apache.sshd.SshServer;
 import org.apache.sshd.agent.common.AgentForwardSupport;
 import org.apache.sshd.client.future.OpenFuture;
@@ -41,6 +39,8 @@ import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.SshdSocketAddress;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.session.AbstractSession;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.server.ServerFactoryManager;
@@ -121,12 +121,12 @@ public class ServerSession extends AbstractSession {
     }
 
     @Override
-    public WriteFuture writePacket(Buffer buffer) throws IOException {
+    public IoWriteFuture writePacket(Buffer buffer) throws IOException {
         boolean rescheduleIdleTimer = getState() == State.Running;
         if (rescheduleIdleTimer) {
             unscheduleIdleTimer();
         }
-        WriteFuture future = super.writePacket(buffer);
+        IoWriteFuture future = super.writePacket(buffer);
         if (rescheduleIdleTimer) {
             scheduleIdleTimer();
         }
@@ -662,6 +662,7 @@ public class ServerSession extends AbstractSession {
                     writePacket(buffer);
                 }
             } catch (Exception e) {
+                log.debug("Error starting tcpip forward", e);
                 if (wantReply) {
                     buffer = createBuffer(SshConstants.Message.SSH_MSG_REQUEST_FAILURE, 0);
                     writePacket(buffer);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java b/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java
index af0e6a2..05e8e2f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java
@@ -18,7 +18,7 @@
  */
 package org.apache.sshd.server.session;
 
-import org.apache.mina.core.session.IoSession;
+import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.session.AbstractSession;
 import org.apache.sshd.common.session.AbstractSessionFactory;
 import org.apache.sshd.server.ServerFactoryManager;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/server/shell/InvertedShellWrapper.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/shell/InvertedShellWrapper.java b/sshd-core/src/main/java/org/apache/sshd/server/shell/InvertedShellWrapper.java
index 08d2642..d9f539a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/shell/InvertedShellWrapper.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/shell/InvertedShellWrapper.java
@@ -25,7 +25,6 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import org.apache.mina.util.NamePreservingRunnable;
 import org.apache.sshd.server.Command;
 import org.apache.sshd.server.Environment;
 import org.apache.sshd.server.ExitCallback;
@@ -109,11 +108,11 @@ public class InvertedShellWrapper implements Command, SessionAware {
         shellIn = shell.getInputStream();
         shellOut = shell.getOutputStream();
         shellErr = shell.getErrorStream();
-        executor.execute(new NamePreservingRunnable(new Runnable() {
+        executor.execute(new Runnable() {
             public void run() {
                 pumpStreams();
             }
-        }, "inverted-shell-pump"));
+        });
     }
 
     public void destroy() {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
index 84d8e9a..0fb907f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
@@ -21,16 +21,7 @@ package org.apache.sshd.server.x11;
 import java.io.IOException;
 import java.net.BindException;
 import java.net.InetSocketAddress;
-import java.util.EnumSet;
-
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.future.IoFutureListener;
-import org.apache.mina.core.service.IoAcceptor;
-import org.apache.mina.core.service.IoHandlerAdapter;
-import org.apache.mina.core.session.IoEventType;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+
 import org.apache.sshd.client.channel.AbstractClientChannel;
 import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
@@ -39,7 +30,13 @@ import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.DefaultCloseFuture;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoAcceptor;
+import org.apache.sshd.common.io.IoCloseFuture;
+import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.common.util.Readable;
 import org.apache.sshd.server.session.ServerSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +44,7 @@ import org.slf4j.LoggerFactory;
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class X11ForwardSupport extends IoHandlerAdapter {
+public class X11ForwardSupport implements IoHandler {
 
     private static String xauthCommand = System.getProperty("sshd.xauthCommand", "xauth");
 
@@ -71,15 +68,8 @@ public class X11ForwardSupport extends IoHandlerAdapter {
 
     public synchronized void initialize() {
         if (this.acceptor == null) {
-            NioSocketAcceptor acceptor = session.getServerFactoryManager().getX11ForwardingAcceptorFactory().createNioSocketAcceptor(session);
-            acceptor.setHandler(this);
-            acceptor.setReuseAddress(true);
-            acceptor.getFilterChain().addLast(
-                    "executor",
-                    new ExecutorFilter(EnumSet.complementOf(
-                            EnumSet.of(IoEventType.SESSION_CREATED)).toArray(
-                            new IoEventType[0])));
-            this.acceptor = acceptor;
+            this.acceptor = session.getFactoryManager().getIoServiceFactory()
+                    .createAcceptor(session.getFactoryManager(), this);
         }
     }
 
@@ -112,7 +102,7 @@ public class X11ForwardSupport extends IoHandlerAdapter {
 
         if (displayNumber >= MAX_DISPLAYS) {
             log.error("Failed to allocate internet-domain X11 display socket.");
-            if (acceptor.getLocalAddresses().isEmpty()) {
+            if (acceptor.getBoundAddresses().isEmpty()) {
                 close();
             }
             return null;
@@ -139,7 +129,6 @@ public class X11ForwardSupport extends IoHandlerAdapter {
         }
     }
 
-    @Override
     public void sessionCreated(IoSession session) throws Exception {
         ChannelForwardedX11 channel = new ChannelForwardedX11(session);
         session.setAttribute(ChannelForwardedX11.class, channel);
@@ -153,7 +142,6 @@ public class X11ForwardSupport extends IoHandlerAdapter {
         }
     }
 
-    @Override
     public void sessionClosed(IoSession session) throws Exception {
         ChannelForwardedX11 channel = (ChannelForwardedX11) session.getAttribute(ChannelForwardedX11.class);
         if ( channel != null ){
@@ -161,18 +149,14 @@ public class X11ForwardSupport extends IoHandlerAdapter {
         }
     }
 
-    @Override
-    public void messageReceived(IoSession session, Object message) throws Exception {
+    public void messageReceived(IoSession session, Readable message) throws Exception {
         ChannelForwardedX11 channel = (ChannelForwardedX11) session.getAttribute(ChannelForwardedX11.class);
-        IoBuffer ioBuffer = (IoBuffer) message;
-        int r = ioBuffer.remaining();
-        byte[] b = new byte[r];
-        ioBuffer.get(b, 0, r);
-        channel.getOut().write(b, 0, r);
+        Buffer buffer = new Buffer();
+        buffer.putBuffer(message);
+        channel.getOut().write(buffer.array(), buffer.rpos(), buffer.available());
         channel.getOut().flush();
     }
 
-    @Override
     public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
         cause.printStackTrace();
         session.close(false);
@@ -212,8 +196,8 @@ public class X11ForwardSupport extends IoHandlerAdapter {
         @Override
         protected synchronized CloseFuture preClose(boolean immediately) {
             final CloseFuture future = new DefaultCloseFuture(null);
-            serverSession.close(immediately).addListener(new IoFutureListener<org.apache.mina.core.future.CloseFuture>() {
-                public void operationComplete(org.apache.mina.core.future.CloseFuture f) {
+            serverSession.close(immediately).addListener(new SshFutureListener<IoCloseFuture>() {
+                public void operationComplete(IoCloseFuture f) {
                     future.setClosed();
                 }
             });
@@ -221,11 +205,8 @@ public class X11ForwardSupport extends IoHandlerAdapter {
         }
 
         protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException {
-            IoBuffer buf = IoBuffer.allocate(len);
-            buf.put(data, off, len);
-            buf.flip();
             localWindow.consumeAndCheck(len);
-            serverSession.write(buf);
+            serverSession.write(new Buffer(data, off, len));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java b/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
index 0c8ac67..6a1d25a 100644
--- a/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
@@ -21,8 +21,8 @@ package org.apache.sshd;
 import java.net.ServerSocket;
 import java.security.KeyPair;
 
-import org.apache.mina.core.session.IoSession;
 import org.apache.sshd.common.KeyPairProvider;
+import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.session.AbstractSession;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.server.ServerFactoryManager;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
index 5db1059..5ff6126 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
@@ -35,6 +35,10 @@ import org.apache.sshd.common.KeyPairProvider;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.mina.MinaSession;
+import org.apache.sshd.common.io.nio2.Nio2Session;
 import org.apache.sshd.common.session.AbstractSession;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.common.util.BufferUtils;
@@ -385,10 +389,9 @@ public class ClientTest {
             buffer.putInt(SshConstants.SSH2_DISCONNECT_BY_APPLICATION);
             buffer.putString("Cancel");
             buffer.putString("");
-            WriteFuture f = cs.writePacket(buffer);
+            IoWriteFuture f = cs.writePacket(buffer);
             f.await();
-            cs.getIoSession().suspendRead();
-            cs.getIoSession().suspendWrite();
+            suspend(cs.getIoSession());
 
             TestEchoShellFactory.TestEchoShell.latch.await();
         } finally {
@@ -396,6 +399,14 @@ public class ClientTest {
         }
     }
 
+    private void suspend(IoSession ioSession) {
+        if (ioSession instanceof MinaSession) {
+            ((MinaSession) ioSession).suspend();
+        } else {
+            ((Nio2Session) ioSession).suspend();
+        }
+    }
+
     public static class TestEchoShellFactory extends EchoShellFactory {
         @Override
         public Command create() {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ServerTest.java b/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
index 6d17c8e..1ff0ad5 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
@@ -25,7 +25,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.mina.core.session.IoSession;
 import org.apache.sshd.client.SessionFactory;
 import org.apache.sshd.client.channel.ChannelShell;
 import org.apache.sshd.client.future.AuthFuture;
@@ -34,6 +33,7 @@ import org.apache.sshd.common.NamedFactory;
 import org.apache.sshd.common.Session;
 import org.apache.sshd.common.SessionListener;
 import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.session.AbstractSession;
 import org.apache.sshd.server.Command;
 import org.apache.sshd.server.command.ScpCommandFactory;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/resources/log4j.properties b/sshd-core/src/test/resources/log4j.properties
index 9172dc9..13cd606 100644
--- a/sshd-core/src/test/resources/log4j.properties
+++ b/sshd-core/src/test/resources/log4j.properties
@@ -28,11 +28,11 @@ log4j.rootLogger=INFO, stdout
 # CONSOLE appender
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%d | %-5.5p | %-16.16t | %-32.32c{1} | %-64.64C %4L | %m%n
 
 # File appender
 log4j.appender.out=org.apache.log4j.FileAppender
 log4j.appender.out.layout=org.apache.log4j.PatternLayout
-log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-log4j.appender.out.file=target/servicemix-test.log
+log4j.appender.out.layout.ConversionPattern=%d | %-5.5p | %-16.16t | %-32.32c{1} | %-64.64C %4L | %m%n
+log4j.appender.out.file=target/sshd-tests.log
 log4j.appender.out.append=true


[2/2] git commit: [SSHD-244] Abstract the IO layer and provide two providers: mina and plain nio2

Posted by gn...@apache.org.
[SSHD-244] Abstract the IO layer and provide two providers: mina and plain nio2


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/bb2eb2b5
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/bb2eb2b5
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/bb2eb2b5

Branch: refs/heads/master
Commit: bb2eb2b5f3192943ff9c23b391b44cad9899aeeb
Parents: 728903a
Author: Guillaume Nodet <gn...@apache.org>
Authored: Thu Jul 25 14:21:37 2013 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Fri Jul 26 10:57:35 2013 +0200

----------------------------------------------------------------------
 sshd-core/pom.xml                               |  20 +-
 .../org.apache.sshd.common.io.IoServiceFactory  |  21 ++
 .../main/java/org/apache/sshd/SshClient.java    |  47 ++--
 .../main/java/org/apache/sshd/SshServer.java    | 108 +------
 .../org/apache/sshd/client/SessionFactory.java  |   2 +-
 .../sshd/client/session/ClientSessionImpl.java  |   2 +-
 .../sshd/common/AbstractFactoryManager.java     |  21 +-
 .../sshd/common/AbstractSessionIoHandler.java   |  16 +-
 .../org/apache/sshd/common/FactoryManager.java  |  13 +-
 .../sshd/common/ForwardingAcceptorFactory.java  |  37 ---
 .../java/org/apache/sshd/common/Session.java    |   4 +-
 .../sshd/common/channel/AbstractChannel.java    |   9 +-
 .../DefaultForwardingAcceptorFactory.java       |  72 -----
 .../common/forward/DefaultTcpipForwarder.java   |  45 ++-
 .../sshd/common/forward/TcpipClientChannel.java |  14 +-
 .../sshd/common/forward/TcpipServerChannel.java |  50 ++--
 .../sshd/common/future/DefaultSshFuture.java    |   7 +-
 .../sshd/common/io/DefaultIoServiceFactory.java |  94 +++++++
 .../org/apache/sshd/common/io/IoAcceptor.java   |  42 +++
 .../sshd/common/io/IoAcceptorFactory.java       |  29 ++
 .../apache/sshd/common/io/IoCloseFuture.java    |  42 +++
 .../apache/sshd/common/io/IoConnectFuture.java  |  65 +++++
 .../org/apache/sshd/common/io/IoConnector.java  |  30 ++
 .../org/apache/sshd/common/io/IoHandler.java    |  35 +++
 .../org/apache/sshd/common/io/IoService.java    |  38 +++
 .../apache/sshd/common/io/IoServiceFactory.java |  31 ++
 .../org/apache/sshd/common/io/IoSession.java    |  84 ++++++
 .../apache/sshd/common/io/IoWriteFuture.java    |  51 ++++
 .../sshd/common/io/mina/MinaAcceptor.java       | 117 ++++++++
 .../sshd/common/io/mina/MinaConnector.java      | 113 ++++++++
 .../apache/sshd/common/io/mina/MinaService.java | 104 +++++++
 .../sshd/common/io/mina/MinaServiceFactory.java |  39 +++
 .../apache/sshd/common/io/mina/MinaSession.java | 146 ++++++++++
 .../apache/sshd/common/io/mina/MinaSupport.java |  42 +++
 .../sshd/common/io/nio2/Nio2Acceptor.java       | 158 +++++++++++
 .../sshd/common/io/nio2/Nio2Connector.java      |  96 +++++++
 .../apache/sshd/common/io/nio2/Nio2Service.java |  95 +++++++
 .../sshd/common/io/nio2/Nio2ServiceFactory.java |  39 +++
 .../apache/sshd/common/io/nio2/Nio2Session.java | 282 +++++++++++++++++++
 .../sshd/common/session/AbstractSession.java    |  32 +--
 .../common/session/AbstractSessionFactory.java  |   3 +-
 .../org/apache/sshd/common/util/Buffer.java     |  14 +-
 .../org/apache/sshd/common/util/Readable.java   |  30 ++
 .../sshd/server/ServerFactoryManager.java       |  10 -
 .../sshd/server/session/ServerSession.java      |   9 +-
 .../sshd/server/session/SessionFactory.java     |   2 +-
 .../sshd/server/shell/InvertedShellWrapper.java |   5 +-
 .../sshd/server/x11/X11ForwardSupport.java      |  55 ++--
 .../org/apache/sshd/AuthenticationTest.java     |   2 +-
 .../test/java/org/apache/sshd/ClientTest.java   |  17 +-
 .../test/java/org/apache/sshd/ServerTest.java   |   2 +-
 sshd-core/src/test/resources/log4j.properties   |   6 +-
 52 files changed, 2019 insertions(+), 428 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/pom.xml
----------------------------------------------------------------------
diff --git a/sshd-core/pom.xml b/sshd-core/pom.xml
index 01ac397..97d3b23 100644
--- a/sshd-core/pom.xml
+++ b/sshd-core/pom.xml
@@ -110,7 +110,7 @@
                     <instructions>
                         <Bundle-SymbolicName>org.apache.sshd.core</Bundle-SymbolicName>
                         <Import-Package>
-                            org.apache.mina*;version="[2,3)",
+                            org.apache.mina*;version="[2,3)";resolution:=optional,
                             com.jcraft.jzlib*;resolution:=optional,
                             org.bouncycastle*;resolution:=optional,
                             org.apache.tomcat.jni*;resolution:=optional,
@@ -148,7 +148,25 @@
                 <artifactId>maven-surefire-plugin</artifactId>
                 <configuration>
                     <redirectTestOutputToFile>true</redirectTestOutputToFile>
+                    <reportsDirectory>${project.build.directory}/surefire-reports-mina</reportsDirectory>
+                    <systemProperties>
+                        <org.apache.sshd.common.io.IoServiceFactory>org.apache.sshd.common.io.mina.MinaServiceFactory</org.apache.sshd.common.io.IoServiceFactory>
+                    </systemProperties>
                 </configuration>
+                <executions>
+                    <execution>
+                        <id>nio2</id>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <reportsDirectory>${project.build.directory}/surefire-reports-nio2</reportsDirectory>
+                            <systemProperties>
+                                <org.apache.sshd.common.io.IoServiceFactory>org.apache.sshd.common.io.nio2.Nio2ServiceFactory</org.apache.sshd.common.io.IoServiceFactory>
+                            </systemProperties>
+                        </configuration>
+                    </execution>
+                </executions>
             </plugin>
         </plugins>
     </build>

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/filtered-resources/META-INF/services/org.apache.sshd.common.io.IoServiceFactory
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/filtered-resources/META-INF/services/org.apache.sshd.common.io.IoServiceFactory b/sshd-core/src/main/filtered-resources/META-INF/services/org.apache.sshd.common.io.IoServiceFactory
new file mode 100644
index 0000000..d168059
--- /dev/null
+++ b/sshd-core/src/main/filtered-resources/META-INF/services/org.apache.sshd.common.io.IoServiceFactory
@@ -0,0 +1,21 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##
+##  http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied.  See the License for the
+## specific language governing permissions and limitations
+## under the License.
+##
+
+org.apache.sshd.common.io.nio2.Nio2ServiceFactory
+org.apache.sshd.common.io.mina.MinaServiceFactory

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/SshClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/SshClient.java b/sshd-core/src/main/java/org/apache/sshd/SshClient.java
index 5682aca..7484b8c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/SshClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/SshClient.java
@@ -35,9 +35,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Executors;
 
-import org.apache.mina.core.future.IoFutureListener;
-import org.apache.mina.core.service.IoConnector;
-import org.apache.mina.transport.socket.nio.NioSocketConnector;
 import org.apache.sshd.client.ClientFactoryManager;
 import org.apache.sshd.client.ServerKeyVerifier;
 import org.apache.sshd.client.SessionFactory;
@@ -54,12 +51,11 @@ import org.apache.sshd.common.AbstractFactoryManager;
 import org.apache.sshd.common.Channel;
 import org.apache.sshd.common.Cipher;
 import org.apache.sshd.common.Compression;
-import org.apache.sshd.common.ForwardingAcceptorFactory;
+import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.KeyExchange;
 import org.apache.sshd.common.Mac;
 import org.apache.sshd.common.NamedFactory;
 import org.apache.sshd.common.Signature;
-import org.apache.sshd.common.TcpipForwarderFactory;
 import org.apache.sshd.common.cipher.AES128CBC;
 import org.apache.sshd.common.cipher.AES128CTR;
 import org.apache.sshd.common.cipher.AES192CBC;
@@ -71,9 +67,12 @@ import org.apache.sshd.common.cipher.BlowfishCBC;
 import org.apache.sshd.common.cipher.TripleDESCBC;
 import org.apache.sshd.common.compression.CompressionNone;
 import org.apache.sshd.common.file.nativefs.NativeFileSystemFactory;
-import org.apache.sshd.common.forward.DefaultForwardingAcceptorFactory;
 import org.apache.sshd.common.forward.DefaultTcpipForwarderFactory;
 import org.apache.sshd.common.forward.TcpipServerChannel;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.DefaultIoServiceFactory;
+import org.apache.sshd.common.io.IoConnectFuture;
+import org.apache.sshd.common.io.IoConnector;
 import org.apache.sshd.common.mac.HMACMD5;
 import org.apache.sshd.common.mac.HMACMD596;
 import org.apache.sshd.common.mac.HMACSHA1;
@@ -139,6 +138,7 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
     protected IoConnector connector;
     protected SessionFactory sessionFactory;
     protected UserInteraction userInteraction;
+    protected Factory<IoConnector> connectorFactory;
 
     private ServerKeyVerifier serverKeyVerifier;
 
@@ -191,9 +191,6 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
         if (getTcpipForwarderFactory() == null) {
             throw new IllegalArgumentException("TcpipForwarderFactory not set");
         }
-        if (getTcpipForwardingAcceptorFactory() == null) {
-            throw new IllegalArgumentException("TcpipForwardingAcceptorFactory not set");
-        }
         if (getServerKeyVerifier() == null) {
             throw new IllegalArgumentException("ServerKeyVerifier not set");
         }
@@ -208,21 +205,18 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
             factories.add(getAgentFactory().getChannelForwardingFactory());
             setChannelFactories(factories);
         }
+        if (getIoServiceFactory() == null) {
+            setIoServiceFactory(new DefaultIoServiceFactory());
+        }
     }
 
     public void start() {
         checkConfig();
-        connector = createAcceptor();
-
         if (sessionFactory == null) {
-            sessionFactory = new SessionFactory();
+            sessionFactory = createSessionFactory();
         }
         sessionFactory.setClient(this);
-        connector.setHandler(sessionFactory);
-    }
-
-    protected NioSocketConnector createAcceptor() {
-        return new NioSocketConnector(getNioWorkers());
+        connector = createConnector();
     }
 
     public void stop() {
@@ -244,14 +238,14 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
         return connect(address);
     }
 
-    public ConnectFuture connect(SocketAddress address) throws IOException {
+    public ConnectFuture connect(SocketAddress address) {
         assert address != null;
         if (connector == null) {
             throw new IllegalStateException("SshClient not started. Please call start() method before connecting to a server");
         }
         final ConnectFuture connectFuture = new DefaultConnectFuture(null);
-        connector.connect(address).addListener(new IoFutureListener<org.apache.mina.core.future.ConnectFuture>() {
-            public void operationComplete(org.apache.mina.core.future.ConnectFuture future) {
+        connector.connect(address).addListener(new SshFutureListener<IoConnectFuture>() {
+            public void operationComplete(IoConnectFuture future) {
                 if (future.isCanceled()) {
                     connectFuture.cancel();
                 } else if (future.getException() != null) {
@@ -265,6 +259,14 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
         return connectFuture;
     }
 
+    protected IoConnector createConnector() {
+        return getIoServiceFactory().createConnector(this, getSessionFactory());
+    }
+
+    protected SessionFactory createSessionFactory() {
+        return new SessionFactory();
+    }
+
     /**
      * Setup a default client.  The client does not require any additional setup.
      *
@@ -301,12 +303,9 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
                 new SignatureRSA.Factory()));
         client.setChannelFactories(Arrays.<NamedFactory<Channel>>asList(
                 new TcpipServerChannel.ForwardedTcpipFactory()));
-        ForwardingAcceptorFactory faf = new DefaultForwardingAcceptorFactory();
-        client.setTcpipForwardingAcceptorFactory(faf);
-        TcpipForwarderFactory tcpipForwarderFactory = new DefaultTcpipForwarderFactory();
-        client.setTcpipForwarderFactory(tcpipForwarderFactory);
         client.setServerKeyVerifier(AcceptAllServerKeyVerifier.INSTANCE);
         client.setFileSystemFactory(new NativeFileSystemFactory());
+        client.setTcpipForwarderFactory(new DefaultTcpipForwarderFactory());
         return client;
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/SshServer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/SshServer.java b/sshd-core/src/main/java/org/apache/sshd/SshServer.java
index acc1ca1..59c2f79 100644
--- a/sshd-core/src/main/java/org/apache/sshd/SshServer.java
+++ b/sshd-core/src/main/java/org/apache/sshd/SshServer.java
@@ -21,11 +21,11 @@ package org.apache.sshd;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.security.InvalidKeyException;
 import java.security.PublicKey;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -33,16 +33,11 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 
-import org.apache.mina.core.service.IoAcceptor;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.core.session.IoSessionConfig;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 import org.apache.sshd.common.AbstractFactoryManager;
 import org.apache.sshd.common.Channel;
 import org.apache.sshd.common.Cipher;
 import org.apache.sshd.common.Compression;
 import org.apache.sshd.common.Factory;
-import org.apache.sshd.common.ForwardingAcceptorFactory;
 import org.apache.sshd.common.ForwardingFilter;
 import org.apache.sshd.common.KeyExchange;
 import org.apache.sshd.common.Mac;
@@ -61,11 +56,13 @@ import org.apache.sshd.common.cipher.BlowfishCBC;
 import org.apache.sshd.common.cipher.TripleDESCBC;
 import org.apache.sshd.common.compression.CompressionNone;
 import org.apache.sshd.common.file.nativefs.NativeFileSystemFactory;
-import org.apache.sshd.common.forward.DefaultForwardingAcceptorFactory;
 import org.apache.sshd.common.forward.DefaultTcpipForwarderFactory;
 import org.apache.sshd.common.forward.TcpipServerChannel;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.DefaultIoServiceFactory;
+import org.apache.sshd.common.io.IoAcceptor;
+import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.mac.HMACMD5;
 import org.apache.sshd.common.mac.HMACMD596;
 import org.apache.sshd.common.mac.HMACSHA1;
@@ -130,9 +127,6 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
     protected IoAcceptor acceptor;
     protected String host;
     protected int port;
-    protected int backlog = 50;
-    protected boolean reuseAddress = true;
-    protected IoSessionConfig sessionConfig;
     protected List<NamedFactory<UserAuth>> userAuthFactories;
     protected Factory<Command> shellFactory;
     protected SessionFactory sessionFactory;
@@ -141,7 +135,6 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
     protected PasswordAuthenticator passwordAuthenticator;
     protected PublickeyAuthenticator publickeyAuthenticator;
     protected GSSAuthenticator gssAuthenticator;
-    protected ForwardingAcceptorFactory x11ForwardingAcceptorFactory;
 
     public SshServer() {
     }
@@ -167,30 +160,6 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
         this.port = port;
     }
 
-    public boolean getReuseAddress() {
-        return reuseAddress;
-    }
-
-    public void setReuseAddress(boolean reuseAddress) {
-        this.reuseAddress = reuseAddress;
-    }
-
-    public int getBacklog() {
-        return backlog;
-    }
-
-    public void setBacklog(int backlog) {
-        this.backlog = backlog;
-    }
-
-    public IoSessionConfig getSessionConfig() {
-        return sessionConfig;
-    }
-
-    public void setSessionConfig(IoSessionConfig sessionConfig) {
-        this.sessionConfig = sessionConfig;
-    }
-
     public List<NamedFactory<UserAuth>> getUserAuthFactories() {
         return userAuthFactories;
     }
@@ -255,14 +224,6 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
       this.gssAuthenticator = gssAuthenticator;
     }
 
-    public void setX11ForwardNioSocketAcceptorFactory(ForwardingAcceptorFactory f) {
-        x11ForwardingAcceptorFactory = f;
-    }
-
-    public ForwardingAcceptorFactory getX11ForwardingAcceptorFactory() {
-        return x11ForwardingAcceptorFactory;
-    }
-
     public void setTcpipForwardingFilter(ForwardingFilter forwardingFilter) {
         this.tcpipForwardingFilter = forwardingFilter;
     }
@@ -316,11 +277,8 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
         if (getFileSystemFactory() == null) {
             throw new IllegalArgumentException("FileSystemFactory not set");
         }
-        if (getTcpipForwardingAcceptorFactory() == null) {
-            throw new IllegalArgumentException("TcpipForwardingAcceptorFactory not set");
-        }
-        if (getX11ForwardingAcceptorFactory() == null) {
-            throw new IllegalArgumentException("X11ForwardingAcceptorFactory not set");
+        if (getIoServiceFactory() == null) {
+            setIoServiceFactory(new DefaultIoServiceFactory());
         }
     }
 
@@ -331,15 +289,11 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
      */
     public void start() throws IOException {
         checkConfig();
-        acceptor = createAcceptor();
-        configure(acceptor);
-
-        SessionFactory handler = sessionFactory;
-        if (handler == null) {
-            handler = createSessionFactory();
+        if (sessionFactory == null) {
+            sessionFactory = createSessionFactory();
         }
-        handler.setServer(this);
-        acceptor.setHandler(handler);
+        sessionFactory.setServer(this);
+        acceptor = createAcceptor();
 
         if (host != null) {
             String[] hosts = host.split(",");
@@ -356,9 +310,9 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
             }
             acceptor.bind(addresses);
         } else {
-            acceptor.bind(new InetSocketAddress(port));
+            acceptor.bind(Collections.singleton(new InetSocketAddress(port)));
             if (port == 0) {
-                port = ((InetSocketAddress) acceptor.getLocalAddress()).getPort();
+                port = ((InetSocketAddress) acceptor.getBoundAddresses().iterator().next()).getPort();
             }
         }
     }
@@ -373,7 +327,6 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
     public void stop(boolean immediately) throws InterruptedException {
         List<AbstractSession> sessions = new ArrayList<AbstractSession>();
         if (acceptor != null) {
-            acceptor.setCloseOnDeactivation(false);
             acceptor.unbind();
             sessions = getActiveSessions();
         }
@@ -390,7 +343,7 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
             latch.await();
         }
         if (acceptor != null) {
-            acceptor.dispose(true);
+            acceptor.dispose();
         }
         acceptor = null;
         if (shutdownExecutor && executor != null) {
@@ -414,35 +367,7 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
     }
 
     protected IoAcceptor createAcceptor() {
-        return new NioSocketAcceptor(getNioWorkers());
-    }
-
-    protected void configure(IoAcceptor acceptor) {
-        if (acceptor instanceof NioSocketAcceptor) {
-            final NioSocketAcceptor nio = (NioSocketAcceptor) acceptor;
-            nio.setReuseAddress(reuseAddress);
-            nio.setBacklog(backlog);
-
-            // MINA itself forces our socket receive buffer to 1024 bytes
-            // by default, despite what the operating system defaults to.
-            // This limits us to about 3 MB/s incoming data transfer.  By
-            // forcing back to the operating system default we can get a
-            // decent transfer rate again.
-            //
-            final Socket s = new Socket();
-            try {
-              try {
-                  nio.getSessionConfig().setReceiveBufferSize(s.getReceiveBufferSize());
-              } finally {
-                  s.close();
-              }
-            } catch (IOException e) {
-                log.warn("cannot adjust SO_RCVBUF back to system default", e);
-            }
-        }
-        if (sessionConfig != null) {
-            acceptor.getSessionConfig().setAll(sessionConfig);
-        }
+        return getIoServiceFactory().createAcceptor(this, getSessionFactory());
     }
 
     protected SessionFactory createSessionFactory() {
@@ -482,12 +407,7 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
                 new SignatureDSA.Factory(),
                 new SignatureRSA.Factory()));
         sshd.setFileSystemFactory(new NativeFileSystemFactory());
-
         sshd.setTcpipForwarderFactory(new DefaultTcpipForwarderFactory());
-        ForwardingAcceptorFactory faf = new DefaultForwardingAcceptorFactory();
-        sshd.setTcpipForwardingAcceptorFactory(faf);
-        sshd.setX11ForwardNioSocketAcceptorFactory(faf);
-        
         return sshd;
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/client/SessionFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/SessionFactory.java b/sshd-core/src/main/java/org/apache/sshd/client/SessionFactory.java
index 37abcb2..c5b3885 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/SessionFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/SessionFactory.java
@@ -18,8 +18,8 @@
  */
 package org.apache.sshd.client;
 
-import org.apache.mina.core.session.IoSession;
 import org.apache.sshd.client.session.ClientSessionImpl;
+import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.session.AbstractSession;
 import org.apache.sshd.common.session.AbstractSessionFactory;
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
index 1b46efc..9d1e650 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.mina.core.session.IoSession;
 import org.apache.sshd.ClientChannel;
 import org.apache.sshd.ClientSession;
 import org.apache.sshd.client.ClientFactoryManager;
@@ -56,6 +55,7 @@ import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.SshdSocketAddress;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.session.AbstractSession;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.server.channel.OpenChannelException;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
index 5a69865..eaa1088 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
@@ -27,6 +27,9 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.sshd.agent.SshAgentFactory;
 import org.apache.sshd.common.file.FileSystemFactory;
+import org.apache.sshd.common.io.IoAcceptor;
+import org.apache.sshd.common.io.IoConnector;
+import org.apache.sshd.common.io.IoServiceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +43,7 @@ public abstract class AbstractFactoryManager implements FactoryManager {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     protected Map<String,String> properties = new HashMap<String,String>();
+    protected IoServiceFactory ioServiceFactory;
     protected List<NamedFactory<KeyExchange>> keyExchangeFactories;
     protected List<NamedFactory<Cipher>> cipherFactories;
     protected List<NamedFactory<Compression>> compressionFactories;
@@ -53,7 +57,6 @@ public abstract class AbstractFactoryManager implements FactoryManager {
     protected ScheduledExecutorService executor;
     protected boolean shutdownExecutor;
     protected TcpipForwarderFactory tcpipForwarderFactory;
-    protected ForwardingAcceptorFactory tcpipForwardingAcceptorFactory;
     protected ForwardingFilter tcpipForwardingFilter;
     protected FileSystemFactory fileSystemFactory;
 
@@ -61,6 +64,14 @@ public abstract class AbstractFactoryManager implements FactoryManager {
         loadVersion();
     }
 
+    public IoServiceFactory getIoServiceFactory() {
+        return ioServiceFactory;
+    }
+
+    public void setIoServiceFactory(IoServiceFactory ioServiceFactory) {
+        this.ioServiceFactory = ioServiceFactory;
+    }
+
     public List<NamedFactory<KeyExchange>> getKeyExchangeFactories() {
         return keyExchangeFactories;
     }
@@ -201,14 +212,6 @@ public abstract class AbstractFactoryManager implements FactoryManager {
         this.tcpipForwarderFactory = tcpipForwarderFactory;
     }
 
-    public ForwardingAcceptorFactory getTcpipForwardingAcceptorFactory() {
-        return tcpipForwardingAcceptorFactory;
-    }
-
-    public void setTcpipForwardingAcceptorFactory(ForwardingAcceptorFactory f) {
-        tcpipForwardingAcceptorFactory = f;
-    }
-
     public ForwardingFilter getTcpipForwardingFilter() {
         return tcpipForwardingFilter;
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/AbstractSessionIoHandler.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/AbstractSessionIoHandler.java b/sshd-core/src/main/java/org/apache/sshd/common/AbstractSessionIoHandler.java
index cb48960..80f14d2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/AbstractSessionIoHandler.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/AbstractSessionIoHandler.java
@@ -18,32 +18,29 @@
  */
 package org.apache.sshd.common;
 
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.service.IoHandlerAdapter;
-import org.apache.mina.core.session.IoSession;
+import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.session.AbstractSession;
+import org.apache.sshd.common.util.Readable;
 
 /**
  * TODO Add javadoc
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public abstract class AbstractSessionIoHandler extends IoHandlerAdapter {
+public abstract class AbstractSessionIoHandler implements IoHandler {
 
     protected abstract AbstractSession createSession(IoSession ioSession) throws Exception;
 
-    @Override
     public void sessionCreated(IoSession ioSession) throws Exception {
         AbstractSession session = createSession(ioSession);
         AbstractSession.attachSession(ioSession, session);
     }
 
-    @Override
     public void sessionClosed(IoSession ioSession) throws Exception {
         AbstractSession.getSession(ioSession).close(true);
     }
 
-    @Override
     public void exceptionCaught(IoSession ioSession, Throwable cause) throws Exception {
         AbstractSession session = AbstractSession.getSession(ioSession, true);
         if (session != null) {
@@ -53,9 +50,8 @@ public abstract class AbstractSessionIoHandler extends IoHandlerAdapter {
         }
     }
 
-    @Override
-    public void messageReceived(IoSession ioSession, Object message) throws Exception {
-        AbstractSession.getSession(ioSession).messageReceived((IoBuffer) message);
+    public void messageReceived(IoSession ioSession, Readable message) throws Exception {
+        AbstractSession.getSession(ioSession).messageReceived(message);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
index 28f5f4a..f5bc47c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -24,6 +24,9 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.sshd.agent.SshAgentFactory;
 import org.apache.sshd.common.file.FileSystemFactory;
+import org.apache.sshd.common.io.IoAcceptor;
+import org.apache.sshd.common.io.IoConnector;
+import org.apache.sshd.common.io.IoServiceFactory;
 
 /**
  * This interface allows retrieving all the <code>NamedFactory</code> used
@@ -77,6 +80,8 @@ public interface FactoryManager {
      */
     String getVersion();
 
+    IoServiceFactory getIoServiceFactory();
+
     /**
      * Retrieve the list of named factories for <code>KeyExchange</code>.
      *
@@ -149,14 +154,6 @@ public interface FactoryManager {
     ScheduledExecutorService getScheduledExecutorService();
 
     /**
-     * Retrieve the IoAcceptor factory to be used to accept incoming connections
-     * to port forwards.
-     *
-     * @return A <code>ForwardNioAcceptorFactory</code>
-     */
-    ForwardingAcceptorFactory getTcpipForwardingAcceptorFactory();
-
-    /**
      * Retrieve the <code>ForwardingFilter</code> to be used by the SSH server.
      * If no filter has been configured (i.e. this method returns
      * <code>null</code>), then all forwarding requests will be rejected.

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/ForwardingAcceptorFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/ForwardingAcceptorFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/ForwardingAcceptorFactory.java
deleted file mode 100644
index 1fefe11..0000000
--- a/sshd-core/src/main/java/org/apache/sshd/common/ForwardingAcceptorFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sshd.common;
-
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
-
-/**
- * A factory for creating NioSocketAcceptor objects for Port & X11 forwarding
- */
-public interface ForwardingAcceptorFactory {
-
-    /**
-     * Creates the NioSocketAcceptor to be used for forwards for this
-     * ServerSession.
-     *
-     * @param session the Session the connections are forwarded through
-     * @return the NioSocketAcceptor that will listen for connections
-     */
-    public NioSocketAcceptor createNioSocketAcceptor(Session session);
-
-}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/Session.java
index de21817..d28b29e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/Session.java
@@ -20,7 +20,7 @@ package org.apache.sshd.common;
 
 import java.io.IOException;
 
-import org.apache.mina.core.future.WriteFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.util.Buffer;
 
 /**
@@ -101,7 +101,7 @@ public interface Session {
      * @return a future that can be used to check when the packet has actually been sent
      * @throws java.io.IOException if an error occured when encoding sending the packet
      */
-    WriteFuture writePacket(Buffer buffer) throws IOException;
+    IoWriteFuture writePacket(Buffer buffer) throws IOException;
 
     /**
      * Send a global request and wait for the response.

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index a0caaf8..c9ce78e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -21,17 +21,14 @@ package org.apache.sshd.common.channel;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.mina.core.future.IoFutureListener;
-import org.apache.mina.core.future.WriteFuture;
 import org.apache.sshd.common.Channel;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.Session;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.DefaultCloseFuture;
-import org.apache.sshd.common.future.DefaultSshFuture;
-import org.apache.sshd.common.future.SshFuture;
 import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.common.util.BufferUtils;
 import org.slf4j.Logger;
@@ -115,8 +112,8 @@ public abstract class AbstractChannel implements Channel {
                         Buffer buffer = session.createBuffer(SshConstants.Message.SSH_MSG_CHANNEL_CLOSE, 0);
                         buffer.putInt(recipient);
                         try {
-                            session.writePacket(buffer).addListener(new IoFutureListener<WriteFuture>() {
-                                public void operationComplete(WriteFuture future) {
+                            session.writePacket(buffer).addListener(new SshFutureListener<IoWriteFuture>() {
+                                public void operationComplete(IoWriteFuture future) {
                                     if (closedByOtherSide) {
                                         log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", id);
                                         postClose();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingAcceptorFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingAcceptorFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingAcceptorFactory.java
deleted file mode 100644
index 641fb3f..0000000
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingAcceptorFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sshd.common.forward;
-
-import java.io.IOException;
-import java.net.Socket;
-
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
-import org.apache.sshd.common.ForwardingAcceptorFactory;
-import org.apache.sshd.common.Session;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Default factory for creating NioSocketAcceptors for Port & X11 Forwarding
- */
-public class DefaultForwardingAcceptorFactory implements ForwardingAcceptorFactory {
-
-    /** The log. */
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    public NioSocketAcceptor createNioSocketAcceptor(Session s) {
-        NioSocketAcceptor nio = new NioSocketAcceptor();
-        nio.setReuseAddress(true);
-
-        configureReceiveBufferSize(nio);
-
-        return nio;
-    }
-
-    /**
-     * MINA itself forces our socket receive buffer to 1024 bytes by default,
-     * despite what the operating system defaults to. This limits us to about 3
-     * MB/s incoming data transfer. By forcing back to the operating system
-     * default we can get a decent transfer rate again.
-     * 
-     * If this method is unable to adjust the buffer size it will log a warning
-     * and return.
-     * 
-     * @param nio
-     *            The NioSocketAcceptor to fix the buffer on
-     */
-    private void configureReceiveBufferSize(NioSocketAcceptor nio) {
-        final Socket s = new Socket();
-        try {
-            try {
-                nio.getSessionConfig().setReceiveBufferSize(s.getReceiveBufferSize());
-            } finally {
-                s.close();
-            }
-        } catch (IOException e) {
-            log.warn("cannot adjust SO_RCVBUF back to system default", e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
index ba986e1..fc63d20 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
@@ -21,19 +21,11 @@ package org.apache.sshd.common.forward;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.service.IoAcceptor;
-import org.apache.mina.core.service.IoHandlerAdapter;
-import org.apache.mina.core.session.IoEventType;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 import org.apache.sshd.ClientChannel;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.ForwardingFilter;
@@ -43,7 +35,11 @@ import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.SshdSocketAddress;
 import org.apache.sshd.common.TcpipForwarder;
 import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoAcceptor;
+import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.common.util.Readable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +48,7 @@ import org.slf4j.LoggerFactory;
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class DefaultTcpipForwarder extends IoHandlerAdapter implements TcpipForwarder {
+public class DefaultTcpipForwarder implements TcpipForwarder, IoHandler {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTcpipForwarder.class);
 
@@ -88,7 +84,7 @@ public class DefaultTcpipForwarder extends IoHandlerAdapter implements TcpipForw
     public synchronized void stopLocalPortForwarding(SshdSocketAddress local) throws IOException {
         if (localToRemote.remove(local.getPort()) != null && acceptor != null) {
             acceptor.unbind(local.toInetSocketAddress());
-            if (acceptor.getLocalAddresses().isEmpty()) {
+            if (acceptor.getBoundAddresses().isEmpty()) {
                 close();
             }
         }
@@ -144,7 +140,7 @@ public class DefaultTcpipForwarder extends IoHandlerAdapter implements TcpipForw
     public synchronized void localPortForwardingCancelled(SshdSocketAddress local) throws IOException {
         if (localForwards.remove(local) && acceptor != null) {
             acceptor.unbind(local.toInetSocketAddress());
-            if (acceptor.getLocalAddresses().isEmpty()) {
+            if (acceptor.getBoundAddresses().isEmpty()) {
                 close();
             }
         }
@@ -152,11 +148,8 @@ public class DefaultTcpipForwarder extends IoHandlerAdapter implements TcpipForw
 
     public synchronized void initialize() {
         if (this.acceptor == null) {
-            NioSocketAcceptor acceptor = session.getFactoryManager().getTcpipForwardingAcceptorFactory().createNioSocketAcceptor(session);
-            acceptor.setHandler(this);
-            acceptor.setReuseAddress(true);
-            acceptor.getFilterChain().addLast("executor", new ExecutorFilter(EnumSet.complementOf(EnumSet.of(IoEventType.SESSION_CREATED)).toArray(new IoEventType[0])));
-            this.acceptor = acceptor;
+            this.acceptor = session.getFactoryManager().getIoServiceFactory()
+                    .createAcceptor(session.getFactoryManager(), this);
         }
     }
 
@@ -171,7 +164,6 @@ public class DefaultTcpipForwarder extends IoHandlerAdapter implements TcpipForw
     // IoHandler implementation
     //
 
-    @Override
     public void sessionCreated(final IoSession session) throws Exception {
         final TcpipClientChannel channel;
         int localPort = ((InetSocketAddress) session.getLocalAddress()).getPort();
@@ -194,7 +186,6 @@ public class DefaultTcpipForwarder extends IoHandlerAdapter implements TcpipForw
         });
     }
 
-    @Override
     public void sessionClosed(IoSession session) throws Exception {
         TcpipClientChannel channel = (TcpipClientChannel) session.getAttribute(TcpipClientChannel.class);
         if (channel != null) {
@@ -203,19 +194,15 @@ public class DefaultTcpipForwarder extends IoHandlerAdapter implements TcpipForw
         }
     }
 
-    @Override
-    public void messageReceived(IoSession session, Object message) throws Exception {
+    public void messageReceived(IoSession session, Readable message) throws Exception {
         TcpipClientChannel channel = (TcpipClientChannel) session.getAttribute(TcpipClientChannel.class);
-        IoBuffer ioBuffer = (IoBuffer) message;
-        int r = ioBuffer.remaining();
-        byte[] b = new byte[r];
-        ioBuffer.get(b, 0, r);
+        Buffer buffer = new Buffer();
+        buffer.putBuffer(message);
         channel.waitFor(ClientChannel.OPENED | ClientChannel.CLOSED, Long.MAX_VALUE);
-        channel.getOut().write(b, 0, r);
+        channel.getOut().write(buffer.array(), buffer.rpos(), buffer.available());
         channel.getOut().flush();
     }
 
-    @Override
     public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
         cause.printStackTrace();
         session.close(false);
@@ -227,10 +214,10 @@ public class DefaultTcpipForwarder extends IoHandlerAdapter implements TcpipForw
 
     private SshdSocketAddress doBind(SshdSocketAddress address) throws IOException {
         initialize();
-        Set<SocketAddress> before = acceptor.getLocalAddresses();
+        Set<SocketAddress> before = acceptor.getBoundAddresses();
         try {
             acceptor.bind(address.toInetSocketAddress());
-            Set<SocketAddress> after = acceptor.getLocalAddresses();
+            Set<SocketAddress> after = acceptor.getBoundAddresses();
             after.removeAll(before);
             if (after.isEmpty()) {
                 throw new IOException("Error binding to " + address + ": no local addresses bound");
@@ -241,7 +228,7 @@ public class DefaultTcpipForwarder extends IoHandlerAdapter implements TcpipForw
             InetSocketAddress result = (InetSocketAddress) after.iterator().next();
             return new SshdSocketAddress(address.getHostName(), result.getPort());
         } catch (IOException bindErr) {
-            if (acceptor.getLocalAddresses().isEmpty()) {
+            if (acceptor.getBoundAddresses().isEmpty()) {
                 close();
             }
             throw bindErr;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index ccd1f77..93ab19d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -21,9 +21,6 @@ package org.apache.sshd.common.forward;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.future.IoFutureListener;
-import org.apache.mina.core.session.IoSession;
 import org.apache.sshd.client.channel.AbstractClientChannel;
 import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
@@ -33,6 +30,9 @@ import org.apache.sshd.common.SshdSocketAddress;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.DefaultCloseFuture;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoCloseFuture;
+import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.util.Buffer;
 
 /**
@@ -101,8 +101,8 @@ public class TcpipClientChannel extends AbstractClientChannel {
     @Override
     protected synchronized CloseFuture preClose(boolean immediately) {
         final CloseFuture future = new DefaultCloseFuture(null);
-        serverSession.close(immediately).addListener(new IoFutureListener<org.apache.mina.core.future.CloseFuture>() {
-            public void operationComplete(org.apache.mina.core.future.CloseFuture f) {
+        serverSession.close(immediately).addListener(new SshFutureListener<IoCloseFuture>() {
+            public void operationComplete(IoCloseFuture f) {
                 future.setClosed();
             }
         });
@@ -110,9 +110,7 @@ public class TcpipClientChannel extends AbstractClientChannel {
     }
 
     protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException {
-        IoBuffer buf = IoBuffer.allocate(len);
-        buf.put(data, off, len);
-        buf.flip();
+        Buffer buf = new Buffer(data, off, len);
         localWindow.consumeAndCheck(len);
         serverSession.write(buf);
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
index 5bd314b..aa7b91a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
@@ -22,14 +22,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.ConnectException;
 
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.future.ConnectFuture;
-import org.apache.mina.core.future.IoFutureListener;
-import org.apache.mina.core.service.IoConnector;
-import org.apache.mina.core.service.IoHandler;
-import org.apache.mina.core.service.IoHandlerAdapter;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.transport.socket.nio.NioSocketConnector;
 import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.Channel;
@@ -39,9 +31,13 @@ import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshdSocketAddress;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.SshFuture;
 import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoConnectFuture;
+import org.apache.sshd.common.io.IoConnector;
+import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.common.util.Readable;
 import org.apache.sshd.server.channel.AbstractServerChannel;
 import org.apache.sshd.server.channel.OpenChannelException;
 
@@ -111,33 +107,32 @@ public class TcpipServerChannel extends AbstractServerChannel {
             return f;
         }
 
-
-        connector = new NioSocketConnector();
         out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.Message.SSH_MSG_CHANNEL_DATA);
-        IoHandler handler = new IoHandlerAdapter() {
-            @Override
-            public void messageReceived(IoSession session, Object message) throws Exception {
+        IoHandler handler = new IoHandler() {
+            public void messageReceived(IoSession session, Readable message) throws Exception {
                 if (closing.get()) {
                     log.debug("Ignoring write to channel {} in CLOSING state", id);
                 } else {
-                    IoBuffer ioBuffer = (IoBuffer) message;
-                    int r = ioBuffer.remaining();
-                    byte[] b = new byte[r];
-                    ioBuffer.get(b, 0, r);
-                    out.write(b, 0, r);
+                    Buffer buffer = new Buffer();
+                    buffer.putBuffer(message);
+                    out.write(buffer.array(), buffer.rpos(), buffer.available());
                     out.flush();
                 }
             }
-
-            @Override
+            public void sessionCreated(IoSession session) throws Exception {
+            }
             public void sessionClosed(IoSession session) throws Exception {
                 close(false);
             }
+            public void exceptionCaught(IoSession ioSession, Throwable cause) throws Exception {
+                close(true);
+            }
         };
-        connector.setHandler(handler);
-        ConnectFuture future = connector.connect(address.toInetSocketAddress());
-        future.addListener(new IoFutureListener<ConnectFuture>() {
-            public void operationComplete(ConnectFuture future) {
+        connector = getSession().getFactoryManager().getIoServiceFactory()
+                .createConnector(getSession().getFactoryManager(), handler);
+        IoConnectFuture future = connector.connect(address.toInetSocketAddress());
+        future.addListener(new SshFutureListener<IoConnectFuture>() {
+            public void operationComplete(IoConnectFuture future) {
                 if (future.isConnected()) {
                     ioSession = future.getSession();
                     f.setOpened();
@@ -188,10 +183,7 @@ public class TcpipServerChannel extends AbstractServerChannel {
     }
 
     protected void doWriteData(byte[] data, int off, int len) throws IOException {
-        IoBuffer buf = IoBuffer.allocate(len);
-        buf.put(data, off, len);
-        buf.flip();
-        ioSession.write(buf);
+        ioSession.write(new Buffer(data, off, len));
     }
 
     protected void doWriteExtendedData(byte[] data, int off, int len) throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
index a7985c3..0f88223 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
@@ -22,7 +22,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.mina.util.ExceptionMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A default implementation of {@link SshFuture}.
@@ -31,6 +32,8 @@ import org.apache.mina.util.ExceptionMonitor;
  */
 public class DefaultSshFuture<T extends SshFuture> implements SshFuture<T> {
 
+    final Logger logger = LoggerFactory.getLogger(getClass());
+
     /** A default value to indicate the future has been canceled */
     private static final Object CANCELED = new Object();
 
@@ -339,7 +342,7 @@ public class DefaultSshFuture<T extends SshFuture> implements SshFuture<T> {
         try {
             l.operationComplete((T) this);
         } catch (Throwable t) {
-            ExceptionMonitor.getInstance().exceptionCaught(t);
+            logger.warn("Listener threw an exception", t);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactory.java
new file mode 100644
index 0000000..4111bfa
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+import org.apache.sshd.common.FactoryManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class DefaultIoServiceFactory implements IoServiceFactory {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultIoServiceFactory.class);
+
+    public IoConnector createConnector(FactoryManager manager, IoHandler handler) {
+        return newInstance(IoServiceFactory.class).createConnector(manager, handler);
+    }
+
+    public IoAcceptor createAcceptor(FactoryManager manager, IoHandler handler) {
+        return newInstance(IoServiceFactory.class).createAcceptor(manager, handler);
+    }
+
+    private static <T> T newInstance(Class<T> clazz) {
+        String factory = System.getProperty(clazz.getName());
+        if (factory != null) {
+            return newInstance(clazz, factory);
+        }
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        if (cl != null) {
+            T t = tryLoad(ServiceLoader.load(clazz, cl));
+            if (t != null) {
+                return t;
+            }
+        }
+        if (cl != DefaultIoServiceFactory.class.getClassLoader()) {
+            T t = tryLoad(ServiceLoader.load(clazz, DefaultIoServiceFactory.class.getClassLoader()));
+            if (t != null) {
+                return t;
+            }
+        }
+        throw new IllegalStateException("Cound not find a valid sshd io provider");
+    }
+
+    private static <T> T tryLoad(ServiceLoader<T> loader) {
+        Iterator<T> it = loader.iterator();
+        while (it.hasNext()) {
+            try {
+                return it.next();
+            } catch (Throwable t) {
+                LOGGER.trace("Exception while loading factory from ServiceLoader", t);
+            }
+        }
+        return null;
+    }
+
+    private static <T> T newInstance(Class<T> clazz, String factory) {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        if (cl != null) {
+            try {
+                return clazz.cast(cl.loadClass(factory).newInstance());
+            } catch (Throwable t) {
+                LOGGER.trace("Exception while loading factory " + factory, t);
+            }
+        }
+        if (cl != DefaultIoServiceFactory.class.getClassLoader()) {
+            try {
+                return clazz.cast(DefaultIoServiceFactory.class.getClassLoader().loadClass(factory).newInstance());
+            } catch (Throwable t) {
+                LOGGER.trace("Exception while loading factory " + factory, t);
+            }
+        }
+        throw new IllegalStateException("Unable to create instance of class " + factory);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/IoAcceptor.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoAcceptor.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoAcceptor.java
new file mode 100644
index 0000000..3433160
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoAcceptor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ */
+public interface IoAcceptor extends IoService {
+
+    void bind(Collection<? extends SocketAddress> addresses) throws IOException;
+
+    void bind(SocketAddress address) throws IOException;
+
+    void unbind(Collection<? extends SocketAddress> addresses);
+
+    void unbind(SocketAddress address);
+
+    void unbind();
+
+    Set<SocketAddress> getBoundAddresses();
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/IoAcceptorFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoAcceptorFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoAcceptorFactory.java
new file mode 100644
index 0000000..86fb54b
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoAcceptorFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import org.apache.sshd.common.FactoryManager;
+
+/**
+ */
+public interface IoAcceptorFactory {
+
+    IoAcceptor create(FactoryManager manager);
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/IoCloseFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoCloseFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoCloseFuture.java
new file mode 100644
index 0000000..507fd59
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoCloseFuture.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import org.apache.sshd.common.future.SshFuture;
+
+/**
+ * An {@link org.apache.sshd.common.future.SshFuture} for asynchronous close requests.
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface IoCloseFuture extends SshFuture<IoCloseFuture> {
+
+    /**
+     * Returns <tt>true</tt> if the close request is finished and the target is closed.
+     */
+    boolean isClosed();
+
+    /**
+     * Marks this future as closed and notifies all threads waiting for this
+     * future.  This method is invoked by SSHD internally.  Please do not call
+     * this method directly.
+     */
+    void setClosed();
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnectFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnectFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnectFuture.java
new file mode 100644
index 0000000..351e5e7
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnectFuture.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import org.apache.sshd.common.future.SshFuture;
+
+public interface IoConnectFuture extends SshFuture<IoConnectFuture> {
+
+    IoSession getSession();
+
+    /**
+     * Returns the cause of the connection failure.
+     *
+     * @return <tt>null</tt> if the connect operation is not finished yet,
+     *         or if the connection attempt is successful.
+     */
+    Throwable getException();
+
+    /**
+     * Returns <tt>true</tt> if the connect operation is finished successfully.
+     */
+    boolean isConnected();
+
+    /**
+     * Returns {@code true} if the connect operation has been canceled by
+     * {@link #cancel()} method.
+     */
+    boolean isCanceled();
+
+    /**
+     * Sets the newly connected session and notifies all threads waiting for
+     * this future.  This method is invoked by SSHD internally.  Please do not
+     * call this method directly.
+     */
+    void setSession(IoSession session);
+
+    /**
+     * Sets the exception caught due to connection failure and notifies all
+     * threads waiting for this future.  This method is invoked by SSHD
+     * internally.  Please do not call this method directly.
+     */
+    void setException(Throwable exception);
+
+    /**
+     * Cancels the connection attempt and notifies all threads waiting for
+     * this future.
+     */
+    void cancel();
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnector.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnector.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnector.java
new file mode 100644
index 0000000..a195dca
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+/**
+ */
+public interface IoConnector extends IoService {
+
+    IoConnectFuture connect(SocketAddress address);
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/IoHandler.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoHandler.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoHandler.java
new file mode 100644
index 0000000..1d877e4
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoHandler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import org.apache.sshd.common.util.Readable;
+
+/**
+ */
+public interface IoHandler {
+
+    void sessionCreated(IoSession session) throws Exception;
+
+    void sessionClosed(IoSession session) throws Exception;
+
+    void exceptionCaught(IoSession ioSession, Throwable cause) throws Exception;
+
+    void messageReceived(IoSession session, Readable message) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/IoService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoService.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoService.java
new file mode 100644
index 0000000..c4f74f7
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoService.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import java.util.Map;
+
+/**
+ */
+public interface IoService {
+
+    void dispose();
+
+    /**
+     * Returns the map of all sessions which are currently managed by this
+     * service.  The key of map is the {@link IoSession#getId() ID} of the
+     * session.
+     *
+     * @return the sessions. An empty collection if there's no session.
+     */
+    Map<Long, IoSession> getManagedSessions();
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactory.java
new file mode 100644
index 0000000..5226e8c
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import org.apache.sshd.common.FactoryManager;
+
+/**
+ */
+public interface IoServiceFactory {
+
+    IoConnector createConnector(FactoryManager manager, IoHandler handler);
+
+    IoAcceptor createAcceptor(FactoryManager manager, IoHandler handler);
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
new file mode 100644
index 0000000..b65861d
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import java.net.SocketAddress;
+
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.util.Buffer;
+
+public interface IoSession {
+
+    /**
+     * Returns a unique identifier for this session.  Every session has its own
+     * ID which is different from each other.
+     */
+    long getId();
+
+    /**
+     * Returns the value of the user-defined attribute of this session.
+     *
+     * @param key the key of the attribute
+     * @return <tt>null</tt> if there is no attribute with the specified key
+     */
+    Object getAttribute(Object key);
+
+    /**
+     * Sets a user-defined attribute.
+     *
+     * @param key   the key of the attribute
+     * @param value the value of the attribute
+     * @return The old value of the attribute.  <tt>null</tt> if it is new.
+     */
+    Object setAttribute(Object key, Object value);
+
+    /**
+     * Returns the socket address of remote peer.
+     */
+    SocketAddress getRemoteAddress();
+
+    /**
+     * Returns the socket address of local machine which is associated with this
+     * session.
+     */
+    SocketAddress getLocalAddress();
+
+    /**
+     * Write a packet on the socket.
+     */
+    IoWriteFuture write(Buffer buffer);
+
+
+    /**
+     * Closes this session immediately or after all queued write requests
+     * are flushed.  This operation is asynchronous.  Wait for the returned
+     * {@link CloseFuture} if you want to wait for the session actually closed.
+     *
+     * @param immediately {@code true} to close this session immediately.
+     *                    The pending write requests will simply be discarded.
+     *                    {@code false} to close this session after all queued
+     *                    write requests are flushed.
+     */
+    IoCloseFuture close(boolean immediately);
+
+    /**
+     * Returns the IoService that created this session.
+     */
+    IoService getService();
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/IoWriteFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoWriteFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoWriteFuture.java
new file mode 100644
index 0000000..79d1ae2
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoWriteFuture.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import org.apache.sshd.common.future.SshFuture;
+
+public interface IoWriteFuture extends SshFuture<IoWriteFuture> {
+
+    /**
+     * Returns <tt>true</tt> if the write operation is finished successfully.
+     */
+    boolean isWritten();
+
+    /**
+     * Returns the cause of the write failure if and only if the write
+     * operation has failed due to an {@link Exception}.  Otherwise,
+     * <tt>null</tt> is returned.
+     */
+    Throwable getException();
+
+    /**
+     * Sets the message is written, and notifies all threads waiting for
+     * this future.  This method is invoked by MINA internally.  Please do
+     * not call this method directly.
+     */
+    void setWritten();
+
+    /**
+     * Sets the cause of the write failure, and notifies all threads waiting
+     * for this future.  This method is invoked by MINA internally.  Please
+     * do not call this method directly.
+     */
+    void setException(Throwable cause);
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bb2eb2b5/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaAcceptor.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaAcceptor.java b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaAcceptor.java
new file mode 100644
index 0000000..5127f13
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaAcceptor.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io.mina;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.mina.core.service.IoAcceptor;
+import org.apache.mina.core.service.IoHandler;
+import org.apache.mina.core.service.IoService;
+import org.apache.mina.core.session.IoSessionConfig;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.sshd.common.FactoryManager;
+
+/**
+ */
+public class MinaAcceptor extends MinaService implements org.apache.sshd.common.io.IoAcceptor, IoHandler {
+
+    protected volatile IoAcceptor acceptor;
+    // Acceptor
+    protected int backlog = 50;
+    protected boolean reuseAddress = true;
+    protected IoSessionConfig sessionConfig;
+
+    public MinaAcceptor(FactoryManager manager, org.apache.sshd.common.io.IoHandler handler) {
+        super(manager, handler);
+    }
+
+    protected IoAcceptor createAcceptor() {
+        NioSocketAcceptor acceptor = new NioSocketAcceptor(getNioWorkers());
+        acceptor.setCloseOnDeactivation(false);
+        acceptor.setReuseAddress(reuseAddress);
+        acceptor.setBacklog(backlog);
+
+        // MINA itself forces our socket receive buffer to 1024 bytes
+        // by default, despite what the operating system defaults to.
+        // This limits us to about 3 MB/s incoming data transfer.  By
+        // forcing back to the operating system default we can get a
+        // decent transfer rate again.
+        //
+        final Socket s = new Socket();
+        try {
+            try {
+                acceptor.getSessionConfig().setReceiveBufferSize(s.getReceiveBufferSize());
+            } finally {
+                s.close();
+            }
+        } catch (IOException e) {
+            log.warn("cannot adjust SO_RCVBUF back to system default", e);
+        }
+        if (sessionConfig != null) {
+            acceptor.getSessionConfig().setAll(sessionConfig);
+        }
+        return acceptor;
+    }
+
+    protected IoAcceptor getAcceptor() {
+        if (acceptor == null) {
+            synchronized (this) {
+                if (acceptor == null) {
+                    acceptor = createAcceptor();
+                    acceptor.setHandler(this);
+                }
+            }
+        }
+        return acceptor;
+    }
+
+    @Override
+    protected IoService getIoService() {
+        return getAcceptor();
+    }
+
+    public void bind(Collection<? extends SocketAddress> addresses) throws IOException {
+        getAcceptor().bind(addresses);
+    }
+
+    public void bind(SocketAddress address) throws IOException {
+        getAcceptor().bind(address);
+    }
+
+    public void unbind() {
+        getAcceptor().unbind();
+    }
+
+    public void unbind(Collection<? extends SocketAddress> addresses) {
+        getAcceptor().unbind(addresses);
+    }
+
+    public void unbind(SocketAddress address) {
+        getAcceptor().unbind(address);
+    }
+
+    public Set<SocketAddress> getBoundAddresses() {
+        return getAcceptor().getLocalAddresses();
+    }
+
+}