You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2016/01/28 11:44:58 UTC

mina-sshd git commit: [SSHD-632] Provide more flexibility in overriding Nio2 related classes - e.g., acceptor, connector, session

Repository: mina-sshd
Updated Branches:
  refs/heads/master c644ad82d -> aa0d8d37f


[SSHD-632] Provide more flexibility in overriding Nio2 related classes - e.g., acceptor, connector, session


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/aa0d8d37
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/aa0d8d37
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/aa0d8d37

Branch: refs/heads/master
Commit: aa0d8d37fe12b368fb4110759e4e0dac4e683395
Parents: c644ad8
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Thu Jan 28 12:44:44 2016 +0200
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Thu Jan 28 12:44:44 2016 +0200

----------------------------------------------------------------------
 .../sshd/common/FactoryManagerHolder.java       |  30 +++
 .../common/io/AbstractIoServiceFactory.java     |   6 +-
 .../sshd/common/io/nio2/Nio2Acceptor.java       |  56 +++-
 .../sshd/common/io/nio2/Nio2Connector.java      | 102 +++++---
 .../apache/sshd/common/io/nio2/Nio2Service.java |  26 +-
 .../apache/sshd/common/io/nio2/Nio2Session.java | 261 ++++++++++++-------
 .../org/apache/sshd/common/session/Session.java |   8 +-
 7 files changed, 346 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa0d8d37/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerHolder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerHolder.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerHolder.java
new file mode 100644
index 0000000..810ac09
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerHolder.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface FactoryManagerHolder {
+    /**
+     * @return The currently associated {@link FactoryManager}
+     */
+    FactoryManager getFactoryManager();
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa0d8d37/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
index 73a91ca..33cb7ce 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.FactoryManagerHolder;
 import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.util.closeable.AbstractCloseable;
 import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
@@ -30,7 +31,9 @@ import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public abstract class AbstractIoServiceFactory extends AbstractCloseable implements IoServiceFactory, ExecutorServiceCarrier {
+public abstract class AbstractIoServiceFactory
+                extends AbstractCloseable
+                implements IoServiceFactory, FactoryManagerHolder, ExecutorServiceCarrier {
 
     private final FactoryManager manager;
     private final ExecutorService executor;
@@ -42,6 +45,7 @@ public abstract class AbstractIoServiceFactory extends AbstractCloseable impleme
         shutdownExecutor = shutdownOnExit;
     }
 
+    @Override
     public final FactoryManager getFactoryManager() {
         return manager;
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa0d8d37/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
index ebc590d..8656c51 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
@@ -24,6 +24,7 @@ import java.net.StandardSocketOptions;
 import java.nio.channels.AsynchronousChannelGroup;
 import java.nio.channels.AsynchronousServerSocketChannel;
 import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -36,26 +37,29 @@ import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.io.IoAcceptor;
 import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.util.ValidateUtils;
 
 /**
  */
 public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
     public static final int DEFAULT_BACKLOG = 0;
 
-    private final Map<SocketAddress, AsynchronousServerSocketChannel> channels;
+    protected final Map<SocketAddress, AsynchronousServerSocketChannel> channels = new ConcurrentHashMap<>();
     private int backlog = DEFAULT_BACKLOG;
 
     public Nio2Acceptor(FactoryManager manager, IoHandler handler, AsynchronousChannelGroup group) {
         super(manager, handler, group);
-        channels = new ConcurrentHashMap<SocketAddress, AsynchronousServerSocketChannel>();
         backlog = PropertyResolverUtils.getIntProperty(manager, FactoryManager.SOCKET_BACKLOG, DEFAULT_BACKLOG);
     }
 
     @Override
     public void bind(Collection<? extends SocketAddress> addresses) throws IOException {
+        AsynchronousChannelGroup group = getChannelGroup();
         for (SocketAddress address : addresses) {
-            log.debug("Binding Nio2Acceptor to address {}", address);
-            AsynchronousServerSocketChannel socket = AsynchronousServerSocketChannel.open(group);
+            if (log.isDebugEnabled()) {
+                log.debug("Binding Nio2Acceptor to address {}", address);
+            }
+            AsynchronousServerSocketChannel socket = openAsynchronousServerSocketChannel(address, group);
             setOption(socket, FactoryManager.SOCKET_KEEPALIVE, StandardSocketOptions.SO_KEEPALIVE, null);
             setOption(socket, FactoryManager.SOCKET_LINGER, StandardSocketOptions.SO_LINGER, null);
             setOption(socket, FactoryManager.SOCKET_RCVBUF, StandardSocketOptions.SO_RCVBUF, null);
@@ -65,10 +69,25 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
             socket.bind(address, backlog);
             SocketAddress local = socket.getLocalAddress();
             channels.put(local, socket);
-            socket.accept(local, new AcceptCompletionHandler(socket));
+
+            CompletionHandler<AsynchronousSocketChannel, ? super SocketAddress> handler =
+                    ValidateUtils.checkNotNull(createSocketCompletionHandler(channels, socket),
+                                               "No completion handler created for address=%s",
+                                               address);
+            socket.accept(local, handler);
         }
     }
 
+    protected AsynchronousServerSocketChannel openAsynchronousServerSocketChannel(
+            SocketAddress address, AsynchronousChannelGroup group) throws IOException {
+        return AsynchronousServerSocketChannel.open(group);
+    }
+
+    protected CompletionHandler<AsynchronousSocketChannel, ? super SocketAddress> createSocketCompletionHandler(
+            Map<SocketAddress, AsynchronousServerSocketChannel> channelsMap, AsynchronousServerSocketChannel socket) throws IOException {
+        return new AcceptCompletionHandler(socket);
+    }
+
     @Override
     public void bind(SocketAddress address) throws IOException {
         bind(Collections.singleton(address));
@@ -133,15 +152,15 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
         super.doCloseImmediately();
     }
 
-    class AcceptCompletionHandler extends Nio2CompletionHandler<AsynchronousSocketChannel, SocketAddress> {
-        private final AsynchronousServerSocketChannel socket;
+    protected class AcceptCompletionHandler extends Nio2CompletionHandler<AsynchronousSocketChannel, SocketAddress> {
+        protected final AsynchronousServerSocketChannel socket;
 
         AcceptCompletionHandler(AsynchronousServerSocketChannel socket) {
             this.socket = socket;
         }
 
-        @SuppressWarnings("synthetic-access")
         @Override
+        @SuppressWarnings("synthetic-access")
         protected void onCompleted(AsynchronousSocketChannel result, SocketAddress address) {
             // Verify that the address has not been unbound
             if (!channels.containsKey(address)) {
@@ -151,7 +170,8 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
             Nio2Session session = null;
             try {
                 // Create a session
-                session = new Nio2Session(Nio2Acceptor.this, manager, handler, result);
+                IoHandler handler = getIoHandler();
+                session = ValidateUtils.checkNotNull(createSession(Nio2Acceptor.this, address, result, handler), "No NIO2 session created");
                 handler.sessionCreated(session);
                 sessions.put(session.getId(), session);
                 session.startReading();
@@ -164,9 +184,9 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
                         session.close();
                     } catch (Throwable t) {
                         log.warn("Failed (" + t.getClass().getSimpleName() + ")"
-                                        + " to close accepted connection from " + address
-                                        + ": " + t.getMessage(),
-                                t);
+                               + " to close accepted connection from " + address
+                               + ": " + t.getMessage(),
+                                 t);
                     }
                 }
             }
@@ -180,12 +200,20 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
         }
 
         @SuppressWarnings("synthetic-access")
+        protected Nio2Session createSession(Nio2Acceptor acceptor, SocketAddress address, AsynchronousSocketChannel channel, IoHandler handler) throws Throwable {
+            if (log.isTraceEnabled()) {
+                log.trace("createNio2Session({}) address={}", acceptor, address);
+            }
+            return new Nio2Session(acceptor, getFactoryManager(), handler, channel);
+        }
+
         @Override
+        @SuppressWarnings("synthetic-access")
         protected void onFailed(final Throwable exc, final SocketAddress address) {
             if (channels.containsKey(address) && !disposing.get()) {
                 log.warn("Caught " + exc.getClass().getSimpleName()
-                                + " while accepting incoming connection from " + address
-                                + ": " + exc.getMessage(),
+                       + " while accepting incoming connection from " + address
+                       + ": " + exc.getMessage(),
                         exc);
             }
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa0d8d37/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
index 3a2f03e..2c2b6a5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
@@ -30,59 +30,102 @@ import org.apache.sshd.common.io.IoConnectFuture;
 import org.apache.sshd.common.io.IoConnector;
 import org.apache.sshd.common.io.IoHandler;
 import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
 
 /**
  */
 public class Nio2Connector extends Nio2Service implements IoConnector {
-
     public Nio2Connector(FactoryManager manager, IoHandler handler, AsynchronousChannelGroup group) {
         super(manager, handler, group);
     }
 
     @Override
     public IoConnectFuture connect(SocketAddress address) {
-        log.debug("Connecting to {}", address);
-        final IoConnectFuture future = new DefaultIoConnectFuture(null);
+        if (log.isDebugEnabled()) {
+            log.debug("Connecting to {}", address);
+        }
+
+        IoConnectFuture future = new DefaultIoConnectFuture(null);
         try {
-            final AsynchronousSocketChannel socket = AsynchronousSocketChannel.open(group);
+            AsynchronousChannelGroup group = getChannelGroup();
+            AsynchronousSocketChannel socket = openAsynchronousSocketChannel(address, group);
             setOption(socket, FactoryManager.SOCKET_KEEPALIVE, StandardSocketOptions.SO_KEEPALIVE, null);
             setOption(socket, FactoryManager.SOCKET_LINGER, StandardSocketOptions.SO_LINGER, null);
             setOption(socket, FactoryManager.SOCKET_RCVBUF, StandardSocketOptions.SO_RCVBUF, null);
             setOption(socket, FactoryManager.SOCKET_REUSEADDR, StandardSocketOptions.SO_REUSEADDR, Boolean.TRUE);
             setOption(socket, FactoryManager.SOCKET_SNDBUF, StandardSocketOptions.SO_SNDBUF, null);
             setOption(socket, FactoryManager.TCP_NODELAY, StandardSocketOptions.TCP_NODELAY, null);
-            socket.connect(address, null, new Nio2CompletionHandler<Void, Object>() {
-                @Override
-                protected void onCompleted(Void result, Object attachment) {
+
+            Nio2CompletionHandler<Void, Object> completionHandler =
+                    ValidateUtils.checkNotNull(createConnectionCompletionHandler(future, socket, getFactoryManager(), getIoHandler()),
+                                               "No connection completion handler created for %s",
+                                               address);
+            socket.connect(address, null, completionHandler);
+        } catch (Throwable exc) {
+            Throwable t = GenericUtils.peelException(exc);
+            if (log.isDebugEnabled()) {
+                log.debug("connect({}) failed ({}) to schedule connection: {}",
+                          address, t.getClass().getSimpleName(), t.getMessage());
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("connect(" + address + ") connection failure details", t);
+            }
+            future.setException(t);
+        }
+        return future;
+    }
+
+    protected AsynchronousSocketChannel openAsynchronousSocketChannel(
+            SocketAddress address, AsynchronousChannelGroup group) throws IOException {
+        return AsynchronousSocketChannel.open(group);
+    }
+
+    protected Nio2CompletionHandler<Void, Object> createConnectionCompletionHandler(
+            final IoConnectFuture future, final AsynchronousSocketChannel socket, final FactoryManager manager, final IoHandler handler) {
+        return new Nio2CompletionHandler<Void, Object>() {
+            @Override
+            @SuppressWarnings("synthetic-access")
+            protected void onCompleted(Void result, Object attachment) {
+                try {
+                    Nio2Session session = createSession(manager, handler, socket);
+                    handler.sessionCreated(session);
+                    sessions.put(session.getId(), session);
+                    future.setSession(session);
+                    session.startReading();
+                } catch (Throwable exc) {
+                    Throwable t = GenericUtils.peelException(exc);
+                    if (log.isDebugEnabled()) {
+                        log.debug("onCompleted - failed {} to start session: {}",
+                                  t.getClass().getSimpleName(), t.getMessage());
+                    }
+                    if (log.isTraceEnabled()) {
+                        log.trace("onCompleted - session creation failure details", t);
+                    }
                     try {
-                        Nio2Session session = new Nio2Session(Nio2Connector.this, manager, handler, socket);
-                        handler.sessionCreated(session);
-                        sessions.put(session.getId(), session);
-                        future.setSession(session);
-                        session.startReading();
-                    } catch (Throwable e) {
-                        try {
-                            socket.close();
-                        } catch (IOException t) {
-                            // Ignore
+                        socket.close();
+                    } catch (IOException err) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("onCompleted - failed {} to close socket: {}", err.getClass().getSimpleName(), err.getMessage());
                         }
-                        future.setException(e);
                     }
+                    future.setException(t);
                 }
+            }
 
-                @Override
-                protected void onFailed(final Throwable exc, final Object attachment) {
-                    future.setException(exc);
-                }
-            });
-        } catch (IOException exc) {
-            future.setException(exc);
-        }
-        return future;
+            @Override
+            protected void onFailed(final Throwable exc, final Object attachment) {
+                future.setException(exc);
+            }
+        };
+    }
+
+    protected Nio2Session createSession(FactoryManager manager, IoHandler handler, AsynchronousSocketChannel socket) throws Throwable {
+        return new Nio2Session(this, manager, handler, socket);
     }
 
-    static class DefaultIoConnectFuture extends DefaultSshFuture<IoConnectFuture> implements IoConnectFuture {
-        DefaultIoConnectFuture(Object lock) {
+    public static class DefaultIoConnectFuture extends DefaultSshFuture<IoConnectFuture> implements IoConnectFuture {
+        public DefaultIoConnectFuture(Object lock) {
             super(lock);
         }
 
@@ -113,5 +156,4 @@ public class Nio2Connector extends Nio2Service implements IoConnector {
             setValue(exception);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa0d8d37/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
index 1df3731..080369a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
@@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.FactoryManagerHolder;
+import org.apache.sshd.common.PropertyResolver;
 import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.io.IoHandler;
 import org.apache.sshd.common.io.IoService;
@@ -42,12 +44,12 @@ import org.apache.sshd.common.util.closeable.CloseableUtils;
 
 /**
  */
-public abstract class Nio2Service extends AbstractInnerCloseable implements IoService {
-    protected final FactoryManager manager;
-    protected final IoHandler handler;
+public abstract class Nio2Service extends AbstractInnerCloseable implements IoService, FactoryManagerHolder {
     protected final Map<Long, IoSession> sessions;
     protected final AtomicBoolean disposing = new AtomicBoolean();
-    protected final AsynchronousChannelGroup group;
+    private final FactoryManager manager;
+    private final IoHandler handler;
+    private final AsynchronousChannelGroup group;
 
     protected Nio2Service(FactoryManager manager, IoHandler handler, AsynchronousChannelGroup group) {
         if (log.isTraceEnabled()) {
@@ -59,9 +61,22 @@ public abstract class Nio2Service extends AbstractInnerCloseable implements IoSe
         this.sessions = new ConcurrentHashMap<>();
     }
 
+    protected AsynchronousChannelGroup getChannelGroup() {
+        return group;
+    }
+
+    @Override
+    public FactoryManager getFactoryManager() {
+        return manager;
+    }
+
+    public IoHandler getIoHandler() {
+        return handler;
+    }
+
     public void dispose() {
         try {
-            long maxWait = CloseableUtils.getMaxCloseWaitTime(manager);
+            long maxWait = CloseableUtils.getMaxCloseWaitTime(getFactoryManager());
             boolean successful = close(true).await(maxWait);
             if (!successful) {
                 throw new SocketTimeoutException("Failed to receive closure confirmation within " + maxWait + " millis");
@@ -92,6 +107,7 @@ public abstract class Nio2Service extends AbstractInnerCloseable implements IoSe
     }
 
     protected <T> void setOption(NetworkChannel socket, String property, SocketOption<T> option, T defaultValue) throws IOException {
+        PropertyResolver manager = getFactoryManager();
         String valStr = PropertyResolverUtils.getString(manager, property);
         T val = defaultValue;
         if (!GenericUtils.isEmpty(valStr)) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa0d8d37/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index 5629d63..474251f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -32,12 +32,14 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.PropertyResolverUtils;
+import org.apache.sshd.common.RuntimeSshException;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.io.IoHandler;
-import org.apache.sshd.common.io.IoService;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.Readable;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.closeable.AbstractCloseable;
 
@@ -51,8 +53,8 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
 
     private final long id = SESSION_ID_GENERATOR.incrementAndGet();
     private final Nio2Service service;
-    private final IoHandler handler;
-    private final AsynchronousSocketChannel socket;
+    private final IoHandler ioHandler;
+    private final AsynchronousSocketChannel socketChannel;
     private final Map<Object, Object> attributes = new HashMap<Object, Object>();
     private final SocketAddress localAddress;
     private final SocketAddress remoteAddress;
@@ -61,13 +63,15 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
     private final AtomicReference<Nio2DefaultIoWriteFuture> currentWrite = new AtomicReference<>();
 
     public Nio2Session(Nio2Service service, FactoryManager manager, IoHandler handler, AsynchronousSocketChannel socket) throws IOException {
-        this.service = service;
-        this.manager = manager;
-        this.handler = handler;
-        this.socket = socket;
+        this.service = ValidateUtils.checkNotNull(service, "No service instance");
+        this.manager = ValidateUtils.checkNotNull(manager, "No factory manager");
+        this.ioHandler = ValidateUtils.checkNotNull(handler, "No IoHandler");
+        this.socketChannel = ValidateUtils.checkNotNull(socket, "No socket channel");
         this.localAddress = socket.getLocalAddress();
         this.remoteAddress = socket.getRemoteAddress();
-        log.debug("Creating IoSession on {} from {}", localAddress, remoteAddress);
+        if (log.isDebugEnabled()) {
+            log.debug("Creating IoSession on {} from {}", localAddress, remoteAddress);
+        }
     }
 
     @Override
@@ -95,23 +99,39 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
         return localAddress;
     }
 
+    public AsynchronousSocketChannel getSocket() {
+        return socketChannel;
+    }
+
+    public IoHandler getIoHandler() {
+        return ioHandler;
+    }
+
     public void suspend() {
+        AsynchronousSocketChannel socket = getSocket();
         try {
-            this.socket.shutdownInput();
+            socket.shutdownInput();
         } catch (IOException e) {
-            // Ignore
+            if (log.isDebugEnabled()) {
+                log.debug("suspend({}) failed {{}) to shutdown input: {}",
+                          this, e.getClass().getSimpleName(), e.getMessage());
+            }
         }
+
         try {
-            this.socket.shutdownOutput();
+            socket.shutdownOutput();
         } catch (IOException e) {
-            // Ignore
+            if (log.isDebugEnabled()) {
+                log.debug("suspend({}) failed {{}) to shutdown output: {}",
+                          this, e.getClass().getSimpleName(), e.getMessage());
+            }
         }
     }
 
     @Override
     public IoWriteFuture write(Buffer buffer) {
         if (log.isDebugEnabled()) {
-            log.debug("Writing {} bytes", Integer.valueOf(buffer.available()));
+            log.debug("Writing {} bytes", buffer.available());
         }
 
         ByteBuffer buf = ByteBuffer.wrap(buffer.array(), buffer.rpos(), buffer.available());
@@ -127,20 +147,28 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
         return future;
     }
 
-    private void exceptionCaught(Throwable exc) {
+    protected void exceptionCaught(Throwable exc) {
         if (!closeFuture.isClosed()) {
+            AsynchronousSocketChannel socket = getSocket();
             if (isClosing() || !socket.isOpen()) {
                 close(true);
             } else {
+                IoHandler handler = getIoHandler();
                 try {
                     if (log.isDebugEnabled()) {
-                        log.debug("Caught {}[{}] - calling handler", exc.getClass().getSimpleName(), exc.getMessage());
+                        log.debug("exceptionCaught({}) caught {}[{}] - calling handler",
+                                  this, exc.getClass().getSimpleName(), exc.getMessage());
                     }
                     handler.exceptionCaught(this, exc);
-                } catch (Throwable t) {
+                } catch (Throwable e) {
+                    Throwable t = GenericUtils.peelException(e);
                     if (log.isDebugEnabled()) {
-                        log.debug("Exception handler threw {}, closing the session: {}",
-                                  t.getClass().getSimpleName(), t.getMessage());
+                        log.debug("exceptionCaught({}) Exception handler threw {}, closing the session: {}",
+                                  this, t.getClass().getSimpleName(), t.getMessage());
+                    }
+
+                    if (log.isTraceEnabled()) {
+                        log.trace("exceptionCaught(" + this + ") exception handler failure details", t);
                     }
                     close(true);
                 }
@@ -163,23 +191,34 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
                 break;
             }
         }
+
+        AsynchronousSocketChannel socket = getSocket();
         try {
             socket.close();
         } catch (IOException e) {
-            log.info("Exception caught while closing socket", e);
+            log.info("doCloseImmediately(" + this + ") exception caught while closing socket", e);
         }
+
         service.sessionClosed(this);
         super.doCloseImmediately();
+
+        IoHandler handler = getIoHandler();
         try {
             handler.sessionClosed(this);
-        } catch (Exception e) {
-            // Ignore
-            log.debug("Exception caught while calling IoHandler#sessionClosed", e);
+        } catch (Throwable e) {
+            if (log.isDebugEnabled()) {
+                log.debug("doCloseImmediately({}) {} while calling IoHandler#sessionClosed: {}",
+                          this, e.getClass().getSimpleName(), e.getMessage());
+            }
+
+            if (log.isTraceEnabled()) {
+                log.trace("doCloseImmediately(" + this + ") IoHandler#sessionClosed failure details", e);
+            }
         }
     }
 
-    @Override
-    public IoService getService() {
+    @Override   // co-variant return
+    public Nio2Service getService() {
         return service;
     }
 
@@ -213,93 +252,141 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
         });
     }
 
-    protected void doReadCycle(final ByteBuffer buffer, final Readable bufReader) {
-        final Nio2CompletionHandler<Integer, Object> completion = new Nio2CompletionHandler<Integer, Object>() {
+    protected void doReadCycle(ByteBuffer buffer, Readable bufReader) {
+        Nio2CompletionHandler<Integer, Object> completion =
+                ValidateUtils.checkNotNull(createReadCycleCompletionHandler(buffer, bufReader), "No completion handler created");
+        doReadCycle(buffer, completion);
+    }
+
+    protected Nio2CompletionHandler<Integer, Object> createReadCycleCompletionHandler(final ByteBuffer buffer, final Readable bufReader) {
+        return new Nio2CompletionHandler<Integer, Object>() {
             @Override
-            @SuppressWarnings("synthetic-access")
             protected void onCompleted(Integer result, Object attachment) {
-                try {
-                    if (result >= 0) {
-                        log.debug("Read {} bytes", result);
-                        buffer.flip();
-                        handler.messageReceived(Nio2Session.this, bufReader);
-                        if (!closeFuture.isClosed()) {
-                            // re-use reference for next iteration since we finished processing it
-                            buffer.clear();
-                            doReadCycle(buffer, this);
-                        } else {
-                            log.debug("IoSession has been closed, stop reading");
-                        }
-                    } else {
-                        log.debug("Socket has been disconnected, closing IoSession now");
-                        Nio2Session.this.close(true);
-                    }
-                } catch (Throwable exc) {
-                    failed(exc, attachment);
-                }
+                handleReadCycleCompletion(buffer, bufReader, this, result, attachment);
             }
 
             @Override
-            @SuppressWarnings("synthetic-access")
             protected void onFailed(Throwable exc, Object attachment) {
-                exceptionCaught(exc);
+                handleReadCycleFailure(buffer, bufReader, exc, attachment);
             }
         };
-        doReadCycle(buffer, completion);
+    }
+
+    protected void handleReadCycleCompletion(
+            ByteBuffer buffer, Readable bufReader, Nio2CompletionHandler<Integer, Object> completionHandler, Integer result, Object attachment) {
+        try {
+            if (result >= 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("handleReadCycleCompletion({}) read {} bytes", this, result);
+                }
+                buffer.flip();
+
+                IoHandler handler = getIoHandler();
+                handler.messageReceived(this, bufReader);
+                if (!closeFuture.isClosed()) {
+                    // re-use reference for next iteration since we finished processing it
+                    buffer.clear();
+                    doReadCycle(buffer, completionHandler);
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("handleReadCycleCompletion({}) IoSession has been closed, stop reading", this);
+                    }
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("handleReadCycleCompletion({}) Socket has been disconnected (result={}), closing IoSession now", this, result);
+                }
+                close(true);
+            }
+        } catch (Throwable exc) {
+            completionHandler.failed(exc, attachment);
+        }
+    }
+
+    protected void handleReadCycleFailure(ByteBuffer buffer, Readable bufReader, Throwable exc, Object attachment) {
+        exceptionCaught(exc);
     }
 
     protected void doReadCycle(ByteBuffer buffer, Nio2CompletionHandler<Integer, Object> completion) {
+        AsynchronousSocketChannel socket = getSocket();
         socket.read(buffer, null, completion);
     }
 
-    @SuppressWarnings("synthetic-access")
-    private void startWriting() {
-        final Nio2DefaultIoWriteFuture future = writes.peek();
+    protected void startWriting() {
+        Nio2DefaultIoWriteFuture future = writes.peek();
         if (future != null) {
             if (currentWrite.compareAndSet(null, future)) {
                 try {
-                    final ByteBuffer buffer = future.getBuffer();
-                    socket.write(buffer, null, new Nio2CompletionHandler<Integer, Object>() {
-                        @Override
-                        protected void onCompleted(Integer result, Object attachment) {
-                            if (buffer.hasRemaining()) {
-                                try {
-                                    socket.write(buffer, null, this);
-                                } catch (Throwable t) {
-                                    log.debug("Exception caught while writing", t);
-                                    future.setWritten();
-                                    finishWrite();
-                                }
-                            } else {
-                                log.debug("Finished writing");
-                                future.setWritten();
-                                finishWrite();
-                            }
-                        }
-
-                        @Override
-                        protected void onFailed(Throwable exc, Object attachment) {
-                            future.setException(exc);
-                            exceptionCaught(exc);
-                            finishWrite();
-                        }
-
-                        private void finishWrite() {
-                            writes.remove(future);
-                            currentWrite.compareAndSet(future, null);
-                            startWriting();
-                        }
-                    });
-                } catch (RuntimeException e) {
+                    ByteBuffer buffer = future.getBuffer();
+                    AsynchronousSocketChannel socket = getSocket();
+                    Nio2CompletionHandler<Integer, Object> handler =
+                            ValidateUtils.checkNotNull(createWriteCycleCompletionHandler(future, socket, buffer),
+                                                       "No write cycle completion handler created");
+                    socket.write(buffer, null, handler);
+                } catch (Throwable e) {
                     future.setWritten();
-                    throw e;
+                    if (e instanceof RuntimeException) {
+                        throw e;
+                    } else {
+                        throw new RuntimeSshException(e);
+                    }
                 }
             }
         }
     }
 
+    protected Nio2CompletionHandler<Integer, Object> createWriteCycleCompletionHandler(
+            final Nio2DefaultIoWriteFuture future, final AsynchronousSocketChannel socket, final ByteBuffer buffer) {
+        return new Nio2CompletionHandler<Integer, Object>() {
+            @Override
+            protected void onCompleted(Integer result, Object attachment) {
+                handleCompletedWriteCycle(future, socket, buffer, this, result, attachment);
+            }
+
+            @Override
+            protected void onFailed(Throwable exc, Object attachment) {
+                handleWriteCycleFailure(future, socket, buffer, exc, attachment);
+            }
+        };
+    }
+
+    protected void handleCompletedWriteCycle(
+            Nio2DefaultIoWriteFuture future, AsynchronousSocketChannel socket, ByteBuffer buffer,
+            Nio2CompletionHandler<Integer, Object> completionHandler, Integer result, Object attachment) {
+        if (buffer.hasRemaining()) {
+            try {
+                socket.write(buffer, null, completionHandler);
+            } catch (Throwable t) {
+                if (log.isDebugEnabled()) {
+                    log.debug("handleCompletedWriteCycle(" + this + ") Exception caught while writing", t);
+                }
+                future.setWritten();
+                finishWrite(future);
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("handleCompletedWriteCycle({}) finished writing", this);
+            }
+            future.setWritten();
+            finishWrite(future);
+        }
+    }
+
+    protected void handleWriteCycleFailure(
+            Nio2DefaultIoWriteFuture future, AsynchronousSocketChannel socket, ByteBuffer buffer, Throwable exc, Object attachment) {
+        future.setException(exc);
+        exceptionCaught(exc);
+        finishWrite(future);
+    }
+
+    protected void finishWrite(Nio2DefaultIoWriteFuture future) {
+        writes.remove(future);
+        currentWrite.compareAndSet(future, null);
+        startWriting();
+    }
+
     @Override
     public String toString() {
-        return getClass().getSimpleName() + "[local=" + localAddress + ", remote=" + remoteAddress + "]";
+        return getClass().getSimpleName() + "[local=" + getLocalAddress() + ", remote=" + getRemoteAddress() + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa0d8d37/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
index d47553b..2729cf6 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.sshd.common.Closeable;
-import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.FactoryManagerHolder;
 import org.apache.sshd.common.PropertyResolver;
 import org.apache.sshd.common.Service;
 import org.apache.sshd.common.auth.MutableUserHolder;
@@ -48,6 +48,7 @@ public interface Session
         extends KexFactoryManager,
                 SessionListenerManager,
                 ChannelListenerManager,
+                FactoryManagerHolder,
                 PropertyResolver,
                 Closeable,
                 MutableUserHolder {
@@ -102,11 +103,6 @@ public interface Session
     String getServerVersion();
 
     /**
-     * @return the {@link FactoryManager} that has created this session, can not be {@code null}
-     */
-    FactoryManager getFactoryManager();
-
-    /**
      * Retrieve one of the negotiated values during the KEX stage
      *
      * @param paramType The request {@link KexProposalOption} value - ignored