You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2018/07/25 12:58:15 UTC

[2/3] mina-sshd git commit: [SSHD-835] Introduce a Closeable ExecutorService

[SSHD-835] Introduce a Closeable ExecutorService


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/7b35bb36
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/7b35bb36
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/7b35bb36

Branch: refs/heads/master
Commit: 7b35bb360410f21be06ad16a7d0a0a0d7cc012af
Parents: 101a509
Author: Guillaume Nodet <gn...@apache.org>
Authored: Thu May 31 09:51:49 2018 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Wed Jul 25 14:37:52 2018 +0200

----------------------------------------------------------------------
 .../sshd/agent/common/AbstractAgentProxy.java   |  27 +-
 .../sshd/agent/local/AgentForwardedChannel.java |   2 +-
 .../agent/local/ChannelAgentForwarding.java     |   6 +-
 .../local/ChannelAgentForwardingFactory.java    |   2 +-
 .../org/apache/sshd/agent/unix/AgentClient.java |  12 +-
 .../org/apache/sshd/agent/unix/AgentServer.java |  22 +-
 .../sshd/agent/unix/AgentServerProxy.java       |  23 +-
 .../sshd/agent/unix/ChannelAgentForwarding.java |  16 +-
 .../unix/ChannelAgentForwardingFactory.java     |  29 +-
 .../sshd/agent/unix/UnixAgentFactory.java       |  59 ++--
 .../client/channel/AbstractClientChannel.java   |   6 +
 .../sshd/client/channel/ChannelSession.java     |  10 +-
 .../client/session/AbstractClientSession.java   |   4 +-
 .../client/session/ClientConnectionService.java |   9 +-
 .../sshd/common/channel/AbstractChannel.java    |  31 +--
 .../common/io/AbstractIoServiceFactory.java     |  18 +-
 .../io/AbstractIoServiceFactoryFactory.java     |  45 +--
 .../io/DefaultIoServiceFactoryFactory.java      |  17 +-
 .../sshd/common/io/IoServiceFactoryFactory.java |   5 +-
 .../sshd/common/io/nio2/Nio2ServiceFactory.java |  19 +-
 .../io/nio2/Nio2ServiceFactoryFactory.java      |  14 +-
 .../helpers/AbstractConnectionService.java      |  38 ++-
 .../common/util/threads/ExecutorService.java    |  26 ++
 .../util/threads/ExecutorServiceCarrier.java    |   8 -
 .../util/threads/ExecutorServiceConfigurer.java |  31 ---
 .../sshd/common/util/threads/ThreadUtils.java   | 276 ++++++++++++++++---
 .../server/channel/AbstractServerChannel.java   |  17 +-
 .../sshd/server/channel/ChannelSession.java     |   9 +-
 .../server/command/AbstractCommandSupport.java  |  14 +-
 .../command/AbstractFileSystemCommand.java      |   6 +-
 .../sshd/server/forward/DirectTcpipFactory.java |   1 +
 .../server/forward/ForwardedTcpipFactory.java   |   1 +
 .../sshd/server/forward/TcpipServerChannel.java |  43 +--
 .../server/session/ServerConnectionService.java |   8 +-
 .../forward/AbstractServerCloseTestSupport.java |  15 +-
 .../io/DefaultIoServiceFactoryFactoryTest.java  |  22 +-
 .../sshd/common/util/ThreadUtilsTest.java       |   5 +-
 .../java/org/apache/sshd/server/ServerTest.java |   2 +-
 .../sshd/util/test/CommandExecutionHelper.java  |   2 +-
 .../org/apache/sshd/git/AbstractGitCommand.java |   6 +-
 .../sshd/git/AbstractGitCommandFactory.java     |  14 +-
 .../apache/sshd/git/pack/GitPackCommand.java    |   8 +-
 .../sshd/git/pack/GitPackCommandFactory.java    |  10 +-
 .../org/apache/sshd/git/pgm/GitPgmCommand.java  |   8 +-
 .../sshd/git/pgm/GitPgmCommandFactory.java      |  10 +-
 .../sshd/common/io/mina/MinaServiceFactory.java |   9 +-
 .../io/mina/MinaServiceFactoryFactory.java      |  21 +-
 .../netty/NettyIoServiceFactoryFactory.java     |   5 +-
 .../sshd/client/scp/AbstractScpClient.java      |   4 +
 .../org/apache/sshd/server/scp/ScpCommand.java  |   9 +-
 .../sshd/server/scp/ScpCommandFactory.java      |  26 +-
 .../org/apache/sshd/client/scp/ScpTest.java     |   8 +-
 .../sshd/server/scp/ScpCommandFactoryTest.java  |   6 +-
 .../server/subsystem/sftp/SftpSubsystem.java    |  17 +-
 .../subsystem/sftp/SftpSubsystemFactory.java    |  33 +--
 .../client/subsystem/sftp/SftpVersionsTest.java |   4 +-
 .../SpaceAvailableExtensionImplTest.java        |   2 +-
 .../openssh/helpers/OpenSSHExtensionsTest.java  |   2 +-
 .../sftp/SftpSubsystemFactoryTest.java          |   6 +-
 59 files changed, 556 insertions(+), 552 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java b/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java
index 028b48e..0432eec 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 
 import org.apache.sshd.agent.SshAgent;
 import org.apache.sshd.agent.SshAgentConstants;
@@ -38,19 +37,20 @@ import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.BufferUtils;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.logging.AbstractLoggingBean;
-import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
+import org.apache.sshd.common.util.threads.ExecutorService;
+import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
 
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public abstract class AbstractAgentProxy extends AbstractLoggingBean implements SshAgent, ExecutorServiceConfigurer {
+public abstract class AbstractAgentProxy extends AbstractLoggingBean implements SshAgent, ExecutorServiceCarrier {
 
     private ExecutorService executor;
-    private boolean shutdownExecutor;
     private String channelType = FactoryManager.AGENT_FORWARDING_TYPE_OPENSSH;
 
-    protected AbstractAgentProxy() {
+    protected AbstractAgentProxy(ExecutorService executorService) {
         super();
+        executor = executorService;
     }
 
     public String getChannelType() {
@@ -67,21 +67,6 @@ public abstract class AbstractAgentProxy extends AbstractLoggingBean implements
     }
 
     @Override
-    public void setExecutorService(ExecutorService service) {
-        executor = service;
-    }
-
-    @Override
-    public boolean isShutdownOnExit() {
-        return shutdownExecutor;
-    }
-
-    @Override
-    public void setShutdownOnExit(boolean shutdown) {
-        shutdownExecutor = shutdown;
-    }
-
-    @Override
     public List<? extends Map.Entry<PublicKey, String>> getIdentities() throws IOException {
         int cmd = SshAgentConstants.SSH2_AGENTC_REQUEST_IDENTITIES;
         int okcmd = SshAgentConstants.SSH2_AGENT_IDENTITIES_ANSWER;
@@ -213,7 +198,7 @@ public abstract class AbstractAgentProxy extends AbstractLoggingBean implements
     @Override
     public void close() throws IOException {
         ExecutorService service = getExecutorService();
-        if ((service != null) && isShutdownOnExit() && (!service.isShutdown())) {
+        if ((service != null) && (!service.isShutdown())) {
             Collection<?> runners = service.shutdownNow();
             if (log.isDebugEnabled()) {
                 log.debug("close() - shutdown runners count=" + GenericUtils.size(runners));

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
index 9306bc8..7657937 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
@@ -46,7 +46,7 @@ public class AgentForwardedChannel extends AbstractClientChannel {
     }
 
     public SshAgent getAgent() {
-        AbstractAgentProxy rtn = new AbstractAgentProxy() {
+        AbstractAgentProxy rtn = new AbstractAgentProxy(null) {
             private final AtomicBoolean open = new AtomicBoolean(true);
 
             @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
index a1868c7..e051f1e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
@@ -20,6 +20,7 @@ package org.apache.sshd.agent.local;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.Collections;
 import java.util.Objects;
 
 import org.apache.sshd.agent.SshAgent;
@@ -36,6 +37,7 @@ import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.threads.ExecutorService;
 import org.apache.sshd.server.channel.AbstractServerChannel;
 
 /**
@@ -46,8 +48,8 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
     private SshAgent agent;
     private AgentClient client;
 
-    public ChannelAgentForwarding() {
-        super();
+    public ChannelAgentForwarding(ExecutorService executor) {
+        super("", Collections.emptyList(), executor);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java
index 1396b61..40f9135 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java
@@ -44,6 +44,6 @@ public class ChannelAgentForwardingFactory implements ChannelFactory {
 
     @Override
     public Channel create() {
-        return new ChannelAgentForwarding();
+        return new ChannelAgentForwarding(null);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
index 97acd90..7f80a89 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -30,6 +29,7 @@ import org.apache.sshd.agent.common.AbstractAgentProxy;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.threads.ExecutorService;
 import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.tomcat.jni.Local;
 import org.apache.tomcat.jni.Pool;
@@ -50,15 +50,13 @@ public class AgentClient extends AbstractAgentProxy implements Runnable {
     private final AtomicBoolean open = new AtomicBoolean(true);
 
     public AgentClient(String authSocket) throws IOException {
-        this(authSocket, null, false);
+        this(authSocket, null);
     }
 
-    public AgentClient(String authSocket, ExecutorService executor, boolean shutdownOnExit) throws IOException {
+    public AgentClient(String authSocket, ExecutorService executor) throws IOException {
+        super((executor == null) ? ThreadUtils.newSingleThreadExecutor("AgentClient[" + authSocket + "]") : executor);
         this.authSocket = authSocket;
 
-        setExecutorService((executor == null) ? ThreadUtils.newSingleThreadExecutor("AgentClient[" + authSocket + "]") : executor);
-        setShutdownOnExit((executor == null) || shutdownOnExit);
-
         try {
             pool = Pool.create(AprLibrary.getInstance().getRootPool());
             handle = Local.create(authSocket, pool);
@@ -141,7 +139,7 @@ public class AgentClient extends AbstractAgentProxy implements Runnable {
             Socket.close(handle);
         }
 
-        if ((pumper != null) && isShutdownOnExit() && (!pumper.isDone())) {
+        if ((pumper != null) && (!pumper.isDone())) {
             pumper.cancel(true);
         }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java
index 113ea23..6c6e13a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java
@@ -21,7 +21,6 @@ package org.apache.sshd.agent.unix;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import org.apache.sshd.agent.SshAgent;
@@ -31,6 +30,7 @@ import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.logging.AbstractLoggingBean;
+import org.apache.sshd.common.util.threads.ExecutorService;
 import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
 import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.tomcat.jni.Local;
@@ -46,24 +46,23 @@ public class AgentServer extends AbstractLoggingBean implements Closeable, Execu
 
     private final SshAgent agent;
     private final ExecutorService service;
-    private final boolean shutdownExecutor;
     private Future<?> agentThread;
     private String authSocket;
     private long pool;
     private long handle;
 
     public AgentServer() {
-        this(null, false);
+        this(null);
     }
 
-    public AgentServer(ExecutorService executor, boolean shutdownOnExit) {
-        this(new AgentImpl(), executor, shutdownOnExit);
+    public AgentServer(ExecutorService executor) {
+        this(new AgentImpl(), executor);
     }
 
-    public AgentServer(SshAgent agent, ExecutorService executor, boolean shutdownOnExit) {
+    public AgentServer(SshAgent agent, ExecutorService executor) {
         this.agent = agent;
-        this.service = (executor == null) ? ThreadUtils.newSingleThreadExecutor("AgentServer[" + agent + "]") : executor;
-        this.shutdownExecutor = service != executor || shutdownOnExit;
+        this.service = (executor == null)
+                ? ThreadUtils.newSingleThreadExecutor("AgentServer[" + agent + "]") : executor;
     }
 
     public SshAgent getAgent() {
@@ -75,11 +74,6 @@ public class AgentServer extends AbstractLoggingBean implements Closeable, Execu
         return service;
     }
 
-    @Override
-    public boolean isShutdownOnExit() {
-        return shutdownExecutor;
-    }
-
     public String start() throws Exception {
         authSocket = AprLibrary.createLocalSocketAddress();
         pool = Pool.create(AprLibrary.getInstance().getRootPool());
@@ -129,7 +123,7 @@ public class AgentServer extends AbstractLoggingBean implements Closeable, Execu
         }
 
         ExecutorService executor = getExecutorService();
-        if ((executor != null) && isShutdownOnExit() && (!executor.isShutdown())) {
+        if ((executor != null) && (!executor.isShutdown())) {
             Collection<?> runners = executor.shutdownNow();
             if (log.isDebugEnabled()) {
                 log.debug("Shut down runners count=" + GenericUtils.size(runners));

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java
index 5b66e57..4401a0d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -33,7 +32,7 @@ import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.OsUtils;
 import org.apache.sshd.common.util.logging.AbstractLoggingBean;
-import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
+import org.apache.sshd.common.util.threads.ExecutorService;
 import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.tomcat.jni.Local;
 import org.apache.tomcat.jni.Pool;
@@ -43,7 +42,7 @@ import org.apache.tomcat.jni.Status;
 /**
  * The server side fake agent, acting as an agent, but actually forwarding the requests to the auth channel on the client side.
  */
-public class AgentServerProxy extends AbstractLoggingBean implements SshAgentServer, ExecutorServiceCarrier {
+public class AgentServerProxy extends AbstractLoggingBean implements SshAgentServer {
     /**
      * Property that can be set on the {@link Session} in order to control
      * the authentication timeout (millis). If not specified then
@@ -61,15 +60,14 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer
     private final long handle;
     private Future<?> piper;
     private final ExecutorService pipeService;
-    private final boolean pipeCloseOnExit;
     private final AtomicBoolean open = new AtomicBoolean(true);
     private final AtomicBoolean innerFinished = new AtomicBoolean(false);
 
     public AgentServerProxy(ConnectionService service) throws IOException {
-        this(service, null, false);
+        this(service, null);
     }
 
-    public AgentServerProxy(ConnectionService service, ExecutorService executor, boolean shutdownOnExit) throws IOException {
+    public AgentServerProxy(ConnectionService service, ExecutorService executor) throws IOException {
         this.service = service;
         try {
             String authSocket = AprLibrary.createLocalSocketAddress();
@@ -89,8 +87,9 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer
                 throw toIOException(result);
             }
 
-            pipeService = (executor == null) ? ThreadUtils.newSingleThreadExecutor("sshd-AgentServerProxy-PIPE-" + authSocket) : executor;
-            pipeCloseOnExit = executor != pipeService || shutdownOnExit;
+            pipeService = (executor == null)
+                    ? ThreadUtils.newSingleThreadExecutor("sshd-AgentServerProxy-PIPE-" + authSocket)
+                    : ThreadUtils.noClose(executor);
             piper = pipeService.submit(() -> {
                 try {
                     boolean debugEnabled = log.isDebugEnabled();
@@ -134,17 +133,11 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer
         return open.get();
     }
 
-    @Override
     public ExecutorService getExecutorService() {
         return pipeService;
     }
 
     @Override
-    public boolean isShutdownOnExit() {
-        return pipeCloseOnExit;
-    }
-
-    @Override
     public String getId() {
         return authSocket;
     }
@@ -194,7 +187,7 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer
         }
 
         ExecutorService executor = getExecutorService();
-        if ((executor != null) && isShutdownOnExit() && (!executor.isShutdown())) {
+        if ((executor != null) && (!executor.isShutdown())) {
             Collection<?> runners = executor.shutdownNow();
             if (debugEnabled) {
                 log.debug("Shut down runners count=" + GenericUtils.size(runners));

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
index 395bd37..586faae 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
@@ -21,7 +21,7 @@ package org.apache.sshd.agent.unix;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
-import java.util.concurrent.ExecutorService;
+import java.util.Collections;
 import java.util.concurrent.Future;
 
 import org.apache.sshd.agent.SshAgent;
@@ -33,6 +33,7 @@ import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ExecutorService;
 import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.sshd.server.channel.AbstractServerChannel;
 import org.apache.tomcat.jni.Local;
@@ -63,10 +64,9 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
     private OutputStream out;
     private ExecutorService forwardService;
     private Future<?> forwarder;
-    private boolean shutdownForwarder;
 
-    public ChannelAgentForwarding() {
-        super();
+    public ChannelAgentForwarding(ExecutorService executor) {
+        super("", Collections.emptyList(), executor);
     }
 
     @Override
@@ -83,8 +83,9 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
             }
 
             ExecutorService service = getExecutorService();
-            forwardService = (service == null) ? ThreadUtils.newSingleThreadExecutor("ChannelAgentForwarding[" + authSocket + "]") : service;
-            shutdownForwarder = service != forwardService || isShutdownOnExit();
+            forwardService = (service == null)
+                    ? ThreadUtils.newSingleThreadExecutor("ChannelAgentForwarding[" + authSocket + "]")
+                    : ThreadUtils.noClose(service);
 
             final int copyBufSize = this.getIntProperty(FORWARDER_BUFFER_SIZE, DEFAULT_FORWARDER_BUF_SIZE);
             ValidateUtils.checkTrue(copyBufSize >= MIN_FORWARDER_BUF_SIZE, "Copy buf size below min.: %d", copyBufSize);
@@ -136,7 +137,7 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
         }
 
         try {
-            if ((forwardService != null) && shutdownForwarder) {
+            if (forwardService != null) {
                 Collection<?> runners = forwardService.shutdownNow();
                 if (log.isDebugEnabled()) {
                     log.debug("Shut down runners count=" + GenericUtils.size(runners));
@@ -144,7 +145,6 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
             }
         } finally {
             forwardService = null;
-            shutdownForwarder = false;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java
index 4f31f41..e4d37d8 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java
@@ -18,25 +18,31 @@
  */
 package org.apache.sshd.agent.unix;
 
-import java.util.concurrent.ExecutorService;
-
+import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelFactory;
 import org.apache.sshd.common.util.ValidateUtils;
-import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
+import org.apache.sshd.common.util.threads.ExecutorService;
 
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class ChannelAgentForwardingFactory implements ChannelFactory, ExecutorServiceCarrier {
+public class ChannelAgentForwardingFactory implements ChannelFactory {
+
     public static final ChannelAgentForwardingFactory OPENSSH = new ChannelAgentForwardingFactory("auth-agent@openssh.com");
     // see https://tools.ietf.org/html/draft-ietf-secsh-agent-02
     public static final ChannelAgentForwardingFactory IETF = new ChannelAgentForwardingFactory("auth-agent");
 
     private final String name;
+    private final Factory<ExecutorService> executorServiceFactory;
 
     public ChannelAgentForwardingFactory(String name) {
+        this(name, null);
+    }
+
+    public ChannelAgentForwardingFactory(String name, Factory<ExecutorService> executorServiceFactory) {
         this.name = ValidateUtils.checkNotNullAndNotEmpty(name, "No channel factory name specified");
+        this.executorServiceFactory = executorServiceFactory;
     }
 
     @Override
@@ -44,21 +50,10 @@ public class ChannelAgentForwardingFactory implements ChannelFactory, ExecutorSe
         return name;
     }
 
-    @Override   // user can override to provide an alternative
-    public ExecutorService getExecutorService() {
-        return null;
-    }
-
-    @Override
-    public boolean isShutdownOnExit() {
-        return false;
-    }
-
     @Override
     public Channel create() {
-        ChannelAgentForwarding channel = new ChannelAgentForwarding();
-        channel.setExecutorService(getExecutorService());
-        channel.setShutdownOnExit(isShutdownOnExit());
+        ExecutorService executorService = executorServiceFactory != null ? executorServiceFactory.create() : null;
+        ChannelAgentForwarding channel = new ChannelAgentForwarding(executorService);
         return channel;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java
index 128214b..02c4f62 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java
@@ -23,12 +23,12 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import org.apache.sshd.agent.SshAgent;
 import org.apache.sshd.agent.SshAgentFactory;
 import org.apache.sshd.agent.SshAgentServer;
+import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.NamedFactory;
 import org.apache.sshd.common.SshException;
@@ -37,65 +37,40 @@ import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
-import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
+import org.apache.sshd.common.util.threads.ExecutorService;
 import org.apache.sshd.server.session.ServerSession;
 
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class UnixAgentFactory implements SshAgentFactory, ExecutorServiceConfigurer {
+public class UnixAgentFactory implements SshAgentFactory {
     public static final List<NamedFactory<Channel>> DEFAULT_FORWARDING_CHANNELS =
             Collections.unmodifiableList(
                     Arrays.<NamedFactory<Channel>>asList(ChannelAgentForwardingFactory.OPENSSH, ChannelAgentForwardingFactory.IETF));
 
-    private ExecutorService executor;
-    private boolean shutdownExecutor;
+    private Factory<ExecutorService> executorServiceFactory;
 
     public UnixAgentFactory() {
         super();
     }
 
-    public UnixAgentFactory(ExecutorService service, boolean shutdown) {
-        executor = service;
-        shutdownExecutor = shutdown;
+    public UnixAgentFactory(Factory<ExecutorService> factory) {
+        executorServiceFactory = factory;
     }
 
-    @Override
-    public ExecutorService getExecutorService() {
-        return executor;
-    }
-
-    @Override
-    public void setExecutorService(ExecutorService service) {
-        executor = service;
-    }
-
-    @Override
-    public boolean isShutdownOnExit() {
-        return shutdownExecutor;
-    }
-
-    @Override
-    public void setShutdownOnExit(boolean shutdown) {
-        shutdownExecutor = shutdown;
+    protected ExecutorService newExecutor() {
+        return executorServiceFactory != null ? executorServiceFactory.create() : null;
     }
 
     @Override
     public List<NamedFactory<Channel>> getChannelForwardingFactories(FactoryManager manager) {
-        final ExecutorServiceConfigurer configurer = this;
-        return Collections.unmodifiableList(DEFAULT_FORWARDING_CHANNELS.stream()
-                .map(cf -> new ChannelAgentForwardingFactory(cf.getName()) {
-                    @Override
-                    public ExecutorService getExecutorService() {
-                        return configurer.getExecutorService();
-                    }
-
-                    @Override
-                    public boolean isShutdownOnExit() {
-                        return configurer.isShutdownOnExit();
-                    }
-                })
-                .collect(Collectors.toList()));
+        if (executorServiceFactory != null) {
+            return DEFAULT_FORWARDING_CHANNELS.stream()
+                    .map(cf -> new ChannelAgentForwardingFactory(cf.getName(), executorServiceFactory))
+                    .collect(Collectors.toList());
+        } else {
+            return DEFAULT_FORWARDING_CHANNELS;
+        }
     }
 
     @Override
@@ -105,7 +80,7 @@ public class UnixAgentFactory implements SshAgentFactory, ExecutorServiceConfigu
             throw new SshException("No " + SshAgent.SSH_AUTHSOCKET_ENV_NAME + " value");
         }
 
-        return new AgentClient(authSocket, getExecutorService(), isShutdownOnExit());
+        return new AgentClient(authSocket, newExecutor());
     }
 
     @Override
@@ -113,6 +88,6 @@ public class UnixAgentFactory implements SshAgentFactory, ExecutorServiceConfigu
         Session session = Objects.requireNonNull(service.getSession(), "No session");
         ValidateUtils.checkInstanceOf(session, ServerSession.class,
                 "The session used to create an agent server proxy must be a server session: %s", session);
-        return new AgentServerProxy(service, getExecutorService(), isShutdownOnExit());
+        return new AgentServerProxy(service, newExecutor());
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 78a6e12..1ed4518 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -101,6 +101,12 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
         });
     }
 
+// TODO: investigate how to fix the forwarding channel failures when enabled
+//    @Override
+//    public ClientSession getSession() {
+//        return (ClientSession) super.getSession();
+//    }
+
     protected void addChannelSignalRequestHandlers(EventNotifier<String> notifier) {
         addRequestHandler(new ExitStatusChannelRequestHandler(exitStatusHolder, notifier));
         addRequestHandler(new ExitSignalChannelRequestHandler(exitSignalHolder, notifier));

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index 459a659..b3f8c67 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -20,7 +20,6 @@ package org.apache.sshd.client.channel;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import org.apache.sshd.common.SshConstants;
@@ -35,6 +34,7 @@ import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ExecutorService;
 import org.apache.sshd.common.util.threads.ThreadUtils;
 
 /**
@@ -46,7 +46,6 @@ public class ChannelSession extends AbstractClientChannel {
 
     private ExecutorService pumperService;
     private Future<?> pumper;
-    private boolean shutdownPumper;
 
     public ChannelSession() {
         super("session");
@@ -93,12 +92,9 @@ public class ChannelSession extends AbstractClientChannel {
                 if (service == null) {
                     pumperService = ThreadUtils.newSingleThreadExecutor("ClientInputStreamPump[" + this.toString() + "]");
                 } else {
-                    pumperService = service;
+                    pumperService = ThreadUtils.noClose(service);
                 }
 
-                // shutdown the temporary executor service if had to create it
-                shutdownPumper = (pumperService != service) || isShutdownOnExit();
-
                 // Interrupt does not really work and the thread will only exit when
                 // the call to read() will return.  So ensure this thread is a daemon
                 // to avoid blocking the whole app
@@ -130,7 +126,7 @@ public class ChannelSession extends AbstractClientChannel {
 
     @Override
     protected void doCloseImmediately() {
-        if ((pumper != null) && (pumperService != null) && shutdownPumper && (!pumperService.isShutdown())) {
+        if ((pumper != null) && (pumperService != null) && (!pumperService.isShutdown())) {
             try {
                 if (!pumper.isDone()) {
                     pumper.cancel(true);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java b/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java
index 0aa5039..5ec3a69 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java
@@ -443,8 +443,8 @@ public abstract class AbstractClientSession extends AbstractSession implements C
 
     @Override
     public KeyExchangeFuture switchToNoneCipher() throws IOException {
-        if (!(currentService instanceof AbstractConnectionService<?>)
-                || !GenericUtils.isEmpty(((AbstractConnectionService<?>) currentService).getChannels())) {
+        if (!(currentService instanceof AbstractConnectionService)
+                || !GenericUtils.isEmpty(((AbstractConnectionService) currentService).getChannels())) {
             throw new IllegalStateException("The switch to the none cipher must be done immediately after authentication");
         }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
index 1743a8e..497e61e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
@@ -39,7 +39,9 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class ClientConnectionService extends AbstractConnectionService<AbstractClientSession> implements ClientSessionHolder {
+public class ClientConnectionService
+        extends AbstractConnectionService
+        implements ClientSessionHolder {
 
     private ScheduledFuture<?> heartBeat;
 
@@ -53,6 +55,11 @@ public class ClientConnectionService extends AbstractConnectionService<AbstractC
     }
 
     @Override
+    public AbstractClientSession getSession() {
+        return (AbstractClientSession) super.getSession();
+    }
+
+    @Override
     public void start() {
         ClientSession session = getClientSession();
         if (!session.isAuthenticated()) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 94e2d20..6d3c78e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -29,7 +29,6 @@ import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -59,7 +58,8 @@ import org.apache.sshd.common.util.buffer.BufferUtils;
 import org.apache.sshd.common.util.closeable.AbstractInnerCloseable;
 import org.apache.sshd.common.util.closeable.IoBaseCloseable;
 import org.apache.sshd.common.util.io.IoUtils;
-import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
+import org.apache.sshd.common.util.threads.ExecutorService;
+import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
 
 /**
  * Provides common client/server channel functionality
@@ -68,7 +68,7 @@ import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
  */
 public abstract class AbstractChannel
         extends AbstractInnerCloseable
-        implements Channel, ExecutorServiceConfigurer {
+        implements Channel, ExecutorServiceCarrier {
 
     /**
      * Default growth factor function used to resize response buffers
@@ -95,7 +95,6 @@ public abstract class AbstractChannel
     private int recipient = -1;
     private Session sessionInstance;
     private ExecutorService executor;
-    private boolean shutdownExecutor;
     private final List<RequestHandler<Channel>> requestHandlers = new CopyOnWriteArrayList<>();
 
     private final Window localWindow;
@@ -115,19 +114,20 @@ public abstract class AbstractChannel
     }
 
     protected AbstractChannel(boolean client, Collection<? extends RequestHandler<Channel>> handlers) {
-        this("", client, handlers);
+        this("", client, handlers, null);
     }
 
     protected AbstractChannel(String discriminator, boolean client) {
-        this(discriminator, client, Collections.emptyList());
+        this(discriminator, client, Collections.emptyList(), null);
     }
 
-    protected AbstractChannel(String discriminator, boolean client, Collection<? extends RequestHandler<Channel>> handlers) {
+    protected AbstractChannel(String discriminator, boolean client, Collection<? extends RequestHandler<Channel>> handlers, ExecutorService executorService) {
         super(discriminator);
         gracefulFuture = new DefaultCloseFuture(discriminator, lock);
         localWindow = new Window(this, null, client, true);
         remoteWindow = new Window(this, null, client, false);
         channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, getClass().getClassLoader(), channelListeners);
+        executor = executorService;
         addRequestHandlers(handlers);
     }
 
@@ -189,21 +189,6 @@ public abstract class AbstractChannel
     }
 
     @Override
-    public void setExecutorService(ExecutorService service) {
-        executor = service;
-    }
-
-    @Override
-    public boolean isShutdownOnExit() {
-        return shutdownExecutor;
-    }
-
-    @Override
-    public void setShutdownOnExit(boolean shutdown) {
-        shutdownExecutor = shutdown;
-    }
-
-    @Override
     public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() {
         return channelStreamPacketWriterResolver;
     }
@@ -655,7 +640,7 @@ public abstract class AbstractChannel
             }
 
             ExecutorService service = getExecutorService();
-            if ((service != null) && isShutdownOnExit() && (!service.isShutdown())) {
+            if ((service != null) && (!service.isShutdown())) {
                 Collection<?> running = service.shutdownNow();
                 if (debugEnabled) {
                     log.debug("close({})[immediately={}] shutdown executor service on close - running count={}",

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
index 5167365..243416c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
@@ -19,12 +19,13 @@
 
 package org.apache.sshd.common.io;
 
-import java.util.concurrent.ExecutorService;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.FactoryManagerHolder;
 import org.apache.sshd.common.util.closeable.AbstractCloseable;
+import org.apache.sshd.common.util.threads.ExecutorService;
 import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
 
 /**
@@ -36,12 +37,10 @@ public abstract class AbstractIoServiceFactory
 
     private final FactoryManager manager;
     private final ExecutorService executor;
-    private final boolean shutdownExecutor;
 
-    protected AbstractIoServiceFactory(FactoryManager factoryManager, ExecutorService executorService, boolean shutdownOnExit) {
-        manager = factoryManager;
-        executor = executorService;
-        shutdownExecutor = shutdownOnExit;
+    protected AbstractIoServiceFactory(FactoryManager factoryManager, ExecutorService executorService) {
+        manager = Objects.requireNonNull(factoryManager);
+        executor = Objects.requireNonNull(executorService);
     }
 
     @Override
@@ -55,15 +54,10 @@ public abstract class AbstractIoServiceFactory
     }
 
     @Override
-    public final boolean isShutdownOnExit() {
-        return shutdownExecutor;
-    }
-
-    @Override
     protected void doCloseImmediately() {
         try {
             ExecutorService service = getExecutorService();
-            if ((service != null) && isShutdownOnExit() && (!service.isShutdown())) {
+            if ((service != null) && (!service.isShutdown())) {
                 log.debug("Shutdown executor");
                 service.shutdownNow();
                 if (service.awaitTermination(5, TimeUnit.SECONDS)) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java
index 1ea4e6e..91f3942 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java
@@ -19,54 +19,37 @@
 
 package org.apache.sshd.common.io;
 
-import java.util.concurrent.ExecutorService;
-
+import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.util.logging.AbstractLoggingBean;
-import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
+import org.apache.sshd.common.util.threads.ExecutorService;
 
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public abstract class AbstractIoServiceFactoryFactory
         extends AbstractLoggingBean
-        implements IoServiceFactoryFactory, ExecutorServiceConfigurer {
+        implements IoServiceFactoryFactory {
 
-    private ExecutorService executorService;
-    private boolean shutdownExecutor;
+    private Factory<ExecutorService> executorServiceFactory;
 
     /**
-     * @param executors      The {@link ExecutorService} to use for spawning threads.
-     *                       If {@code null} then an internal service is allocated - in which case it
-     *                       is automatically shutdown regardless of the value of the <tt>shutdownOnExit</tt>
-     *                       parameter value
-     * @param shutdownOnExit If {@code true} then the {@link ExecutorService#shutdownNow()}
-     *                       will be called (unless it is an internally allocated service which is always
-     *                       closed)
+     * @param factory      The {@link ExecutorService} factory to use for spawning threads.
+     *                     If {@code null} then an internal service will be allocated.
      */
-    protected AbstractIoServiceFactoryFactory(ExecutorService executors, boolean shutdownOnExit) {
-        executorService = executors;
-        shutdownExecutor = shutdownOnExit;
-    }
-
-    @Override
-    public ExecutorService getExecutorService() {
-        return executorService;
+    protected AbstractIoServiceFactoryFactory(Factory<ExecutorService> factory) {
+        executorServiceFactory = factory;
     }
 
-    @Override
-    public void setExecutorService(ExecutorService service) {
-        executorService = service;
-
+    public Factory<ExecutorService> getExecutorServiceFactory() {
+        return executorServiceFactory;
     }
 
     @Override
-    public boolean isShutdownOnExit() {
-        return shutdownExecutor;
+    public void setExecutorServiceFactory(Factory<ExecutorService> factory) {
+        executorServiceFactory = factory;
     }
 
-    @Override
-    public void setShutdownOnExit(boolean shutdown) {
-        shutdownExecutor = shutdown;
+    protected ExecutorService newExecutor() {
+        return executorServiceFactory != null ? executorServiceFactory.create() : null;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java
index 74775ff..f30331a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java
@@ -20,11 +20,11 @@ package org.apache.sshd.common.io;
 
 import java.util.Iterator;
 import java.util.ServiceLoader;
-import java.util.concurrent.ExecutorService;
 
+import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.util.GenericUtils;
-import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
+import org.apache.sshd.common.util.threads.ExecutorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,11 +37,11 @@ public class DefaultIoServiceFactoryFactory extends AbstractIoServiceFactoryFact
     private IoServiceFactoryFactory factory;
 
     protected DefaultIoServiceFactoryFactory() {
-        this(null, true);
+        this(null);
     }
 
-    protected DefaultIoServiceFactoryFactory(ExecutorService executors, boolean shutdownOnExit) {
-        super(executors, shutdownOnExit);
+    protected DefaultIoServiceFactoryFactory(Factory<ExecutorService> factory) {
+        super(factory);
     }
 
     @Override
@@ -57,10 +57,9 @@ public class DefaultIoServiceFactoryFactory extends AbstractIoServiceFactoryFact
         synchronized (this) {
             if (factory == null) {
                 factory = newInstance(IoServiceFactoryFactory.class);
-                if (factory instanceof ExecutorServiceConfigurer) {
-                    ExecutorServiceConfigurer configurer = (ExecutorServiceConfigurer) factory;
-                    configurer.setExecutorService(getExecutorService());
-                    configurer.setShutdownOnExit(isShutdownOnExit());
+                Factory<ExecutorService> executorServiceFactory = getExecutorServiceFactory();
+                if (executorServiceFactory != null) {
+                    factory.setExecutorServiceFactory(executorServiceFactory);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java
index 92f1e73..74b2237 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java
@@ -18,13 +18,16 @@
  */
 package org.apache.sshd.common.io;
 
+import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.util.threads.ExecutorService;
 
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-@FunctionalInterface
 public interface IoServiceFactoryFactory {
 
     IoServiceFactory create(FactoryManager manager);
+
+    void setExecutorServiceFactory(Factory<ExecutorService> factory);
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java
index 1cdb651..12fd522 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java
@@ -20,7 +20,6 @@ package org.apache.sshd.common.io.nio2;
 
 import java.io.IOException;
 import java.nio.channels.AsynchronousChannelGroup;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.sshd.common.FactoryManager;
@@ -29,6 +28,7 @@ import org.apache.sshd.common.io.AbstractIoServiceFactory;
 import org.apache.sshd.common.io.IoAcceptor;
 import org.apache.sshd.common.io.IoConnector;
 import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.util.threads.ExecutorService;
 import org.apache.sshd.common.util.threads.ThreadUtils;
 
 /**
@@ -38,12 +38,11 @@ public class Nio2ServiceFactory extends AbstractIoServiceFactory {
 
     private final AsynchronousChannelGroup group;
 
-    public Nio2ServiceFactory(FactoryManager factoryManager, ExecutorService service, boolean shutdownOnExit) {
+    public Nio2ServiceFactory(FactoryManager factoryManager, ExecutorService service) {
         super(factoryManager,
-                service == null ? ThreadUtils.newFixedThreadPool(factoryManager.toString() + "-nio2", getNioWorkers(factoryManager)) : service,
-                service == null || shutdownOnExit);
+              ThreadUtils.newFixedThreadPoolIf(service, factoryManager.toString() + "-nio2", getNioWorkers(factoryManager)));
         try {
-            group = AsynchronousChannelGroup.withThreadPool(ThreadUtils.protectExecutorServiceShutdown(getExecutorService(), isShutdownOnExit()));
+            group = AsynchronousChannelGroup.withThreadPool(ThreadUtils.noClose(getExecutorService()));
         } catch (IOException e) {
             log.warn("Failed (" + e.getClass().getSimpleName() + " to start async. channel group: " + e.getMessage());
             if (log.isDebugEnabled()) {
@@ -71,12 +70,10 @@ public class Nio2ServiceFactory extends AbstractIoServiceFactory {
                 group.shutdownNow();
 
                 // if we protect the executor then the await will fail since we didn't really shut it down...
-                if (isShutdownOnExit()) {
-                    if (group.awaitTermination(5, TimeUnit.SECONDS)) {
-                        log.debug("Group successfully shut down");
-                    } else {
-                        log.debug("Not all group tasks terminated");
-                    }
+                if (group.awaitTermination(5, TimeUnit.SECONDS)) {
+                    log.debug("Group successfully shut down");
+                } else {
+                    log.debug("Not all group tasks terminated");
                 }
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java
index 3f64b81..d99874c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java
@@ -20,18 +20,19 @@ package org.apache.sshd.common.io.nio2;
 
 import java.nio.channels.AsynchronousChannel;
 import java.util.Objects;
-import java.util.concurrent.ExecutorService;
 
+import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.io.AbstractIoServiceFactoryFactory;
 import org.apache.sshd.common.io.IoServiceFactory;
+import org.apache.sshd.common.util.threads.ExecutorService;
 
 /**
  */
 public class Nio2ServiceFactoryFactory extends AbstractIoServiceFactoryFactory {
 
     public Nio2ServiceFactoryFactory() {
-        this(null, true);
+        this(null);
     }
 
     /**
@@ -39,18 +40,15 @@ public class Nio2ServiceFactoryFactory extends AbstractIoServiceFactoryFactory {
      *                       If {@code null} then an internal service is allocated - in which case it
      *                       is automatically shutdown regardless of the value of the <tt>shutdownOnExit</tt>
      *                       parameter value
-     * @param shutdownOnExit If {@code true} then the {@link ExecutorService#shutdownNow()}
-     *                       will be called (unless it is an internally allocated service which is always
-     *                       closed)
      */
-    public Nio2ServiceFactoryFactory(ExecutorService executors, boolean shutdownOnExit) {
-        super(executors, shutdownOnExit);
+    public Nio2ServiceFactoryFactory(Factory<ExecutorService> executors) {
+        super(executors);
         // Make sure NIO2 is available
         Objects.requireNonNull(AsynchronousChannel.class, "Missing NIO2 class");
     }
 
     @Override
     public IoServiceFactory create(FactoryManager manager) {
-        return new Nio2ServiceFactory(manager, getExecutorService(), isShutdownOnExit());
+        return new Nio2ServiceFactory(manager, newExecutor());
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index 78b1257..cd711e0 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -52,7 +52,6 @@ import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
 import org.apache.sshd.common.io.AbstractIoWriteFuture;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.session.ConnectionService;
-import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.session.UnknownChannelReferenceHandler;
 import org.apache.sshd.common.util.EventListenerUtils;
 import org.apache.sshd.common.util.GenericUtils;
@@ -66,10 +65,9 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
 /**
  * Base implementation of ConnectionService.
  *
- * @param <S> Type of {@link AbstractSession} being used
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public abstract class AbstractConnectionService<S extends AbstractSession>
+public abstract class AbstractConnectionService
                 extends AbstractInnerCloseable
                 implements ConnectionService {
     /**
@@ -105,10 +103,10 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
     private final Collection<PortForwardingEventListener> listeners = new CopyOnWriteArraySet<>();
     private final Collection<PortForwardingEventListenerManager> managersHolder = new CopyOnWriteArraySet<>();
     private final PortForwardingEventListener listenerProxy;
-    private final S sessionInstance;
+    private final AbstractSession sessionInstance;
     private UnknownChannelReferenceHandler unknownChannelReferenceHandler;
 
-    protected AbstractConnectionService(S session) {
+    protected AbstractConnectionService(AbstractSession session) {
         sessionInstance = Objects.requireNonNull(session, "No session");
         listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners);
     }
@@ -166,7 +164,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
     }
 
     @Override
-    public S getSession() {
+    public AbstractSession getSession() {
         return sessionInstance;
     }
 
@@ -178,7 +176,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
     @Override
     public ForwardingFilter getForwardingFilter() {
         ForwardingFilter forwarder;
-        S session = getSession();
+        AbstractSession session = getSession();
         synchronized (forwarderHolder) {
             forwarder = forwarderHolder.get();
             if (forwarder != null) {
@@ -202,7 +200,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
         super.preClose();
     }
 
-    protected ForwardingFilter createForwardingFilter(S session) {
+    protected ForwardingFilter createForwardingFilter(AbstractSession session) {
         FactoryManager manager =
             Objects.requireNonNull(session.getFactoryManager(), "No factory manager");
         ForwardingFilterFactory factory =
@@ -215,7 +213,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
     @Override
     public X11ForwardSupport getX11ForwardSupport() {
         X11ForwardSupport x11Support;
-        S session = getSession();
+        AbstractSession session = getSession();
         synchronized (x11ForwardHolder) {
             x11Support = x11ForwardHolder.get();
             if (x11Support != null) {
@@ -232,14 +230,14 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
         return x11Support;
     }
 
-    protected X11ForwardSupport createX11ForwardSupport(S session) {
+    protected X11ForwardSupport createX11ForwardSupport(AbstractSession session) {
         return new DefaultX11ForwardSupport(this);
     }
 
     @Override
     public AgentForwardSupport getAgentForwardSupport() {
         AgentForwardSupport agentForward;
-        S session = getSession();
+        AbstractSession session = getSession();
         synchronized (agentForwardHolder) {
             agentForward = agentForwardHolder.get();
             if (agentForward != null) {
@@ -257,7 +255,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
         return agentForward;
     }
 
-    protected AgentForwardSupport createAgentForwardSupport(S session) {
+    protected AgentForwardSupport createAgentForwardSupport(AbstractSession session) {
         return new DefaultAgentForwardSupport(this);
     }
 
@@ -275,7 +273,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
 
     @Override
     public int registerChannel(Channel channel) throws IOException {
-        Session session = getSession();
+        AbstractSession session = getSession();
         int maxChannels = session.getIntProperty(MAX_CONCURRENT_CHANNELS_PROP, DEFAULT_MAX_CHANNELS);
         int curSize = channels.size();
         if (curSize > maxChannels) {
@@ -584,7 +582,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
             return handler;
         }
 
-        Session s = getSession();
+        AbstractSession s = getSession();
         return (s == null) ? null : s.resolveUnknownChannelReferenceHandler();
     }
 
@@ -614,7 +612,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
             return;
         }
 
-        Session session = getSession();
+        AbstractSession session = getSession();
         FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager");
         Channel channel = NamedFactory.create(manager.getChannelFactories(), type);
         if (channel == null) {
@@ -673,7 +671,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
                       this, sender, SshConstants.getOpenErrorCodeName(reasonCode), lang, message);
         }
 
-        Session session = getSession();
+        AbstractSession session = getSession();
         Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE,
                 Long.SIZE + GenericUtils.length(message) + GenericUtils.length(lang));
         buf.putInt(sender);
@@ -701,7 +699,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
                       this, req, wantReply);
         }
 
-        Session session = getSession();
+        AbstractSession session = getSession();
         FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager");
         Collection<RequestHandler<ConnectionService>> handlers = manager.getGlobalRequestHandlers();
         if (GenericUtils.size(handlers) > 0) {
@@ -755,18 +753,18 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
         byte cmd = RequestHandler.Result.ReplySuccess.equals(result)
                  ? SshConstants.SSH_MSG_REQUEST_SUCCESS
                  : SshConstants.SSH_MSG_REQUEST_FAILURE;
-        Session session = getSession();
+        AbstractSession session = getSession();
         Buffer rsp = session.createBuffer(cmd, 2);
         return session.writePacket(rsp);
     }
 
     protected void requestSuccess(Buffer buffer) throws Exception {
-        S s = getSession();
+        AbstractSession s = (AbstractSession) getSession();
         s.requestSuccess(buffer);
     }
 
     protected void requestFailure(Buffer buffer) throws Exception {
-        S s = getSession();
+        AbstractSession s = (AbstractSession) getSession();
         s.requestFailure(buffer);
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorService.java b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorService.java
new file mode 100644
index 0000000..fe7ccbe
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorService.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.util.threads;
+
+import org.apache.sshd.common.Closeable;
+
+public interface ExecutorService extends java.util.concurrent.ExecutorService, Closeable {
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java
index 71c8c36..7e9378b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java
@@ -19,8 +19,6 @@
 
 package org.apache.sshd.common.util.threads;
 
-import java.util.concurrent.ExecutorService;
-
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
@@ -30,10 +28,4 @@ public interface ExecutorServiceCarrier {
      */
     ExecutorService getExecutorService();
 
-    /**
-     * @return If {@code true} then the {@link ExecutorService#shutdownNow()}
-     * will be called (unless it is an internally allocated service which is always
-     * closed)
-     */
-    boolean isShutdownOnExit();
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceConfigurer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceConfigurer.java b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceConfigurer.java
deleted file mode 100644
index e8519c8..0000000
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceConfigurer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sshd.common.util.threads;
-
-import java.util.concurrent.ExecutorService;
-
-/**
- * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
- */
-public interface ExecutorServiceConfigurer extends ExecutorServiceCarrier {
-    void setExecutorService(ExecutorService service);
-
-    void setShutdownOnExit(boolean shutdown);
-}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
index 803e87e..34a935b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
@@ -18,26 +18,33 @@
  */
 package org.apache.sshd.common.util.threads;
 
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
+import java.io.IOException;
 import java.security.AccessController;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.DefaultCloseFuture;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.util.closeable.AbstractCloseable;
 import org.apache.sshd.common.util.logging.AbstractLoggingBean;
 
 /**
@@ -66,34 +73,17 @@ public final class ThreadUtils {
      * value of the <tt>shutdownOnExit</tt> parameter
      */
     public static ExecutorService protectExecutorServiceShutdown(final ExecutorService executorService, boolean shutdownOnExit) {
-        if (executorService == null || shutdownOnExit) {
+        if (executorService == null || shutdownOnExit || executorService instanceof NoCloseExecutor) {
             return executorService;
         } else {
-            return (ExecutorService) Proxy.newProxyInstance(
-                    resolveDefaultClassLoader(executorService),
-                    new Class<?>[]{ExecutorService.class},
-                    new InvocationHandler() {
-                        private final AtomicBoolean stopped = new AtomicBoolean(false);
-
-                        @Override
-                        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-                            String name = method.getName();
-                            if ("isShutdown".equals(name)) {
-                                return stopped.get();
-                            } else if ("shutdown".equals(name)) {
-                                stopped.set(true);
-                                return null;    // void...
-                            } else if ("shutdownNow".equals(name)) {
-                                stopped.set(true);
-                                return Collections.emptyList();
-                            } else {
-                                return method.invoke(executorService, args);
-                            }
-                        }
-                    });
+            return new NoCloseExecutor(executorService);
         }
     }
 
+    public static ExecutorService noClose(ExecutorService executorService) {
+        return protectExecutorServiceShutdown(executorService, false);
+    }
+
     public static ClassLoader resolveDefaultClassLoader(Object anchor) {
         return resolveDefaultClassLoader(anchor == null ? null : anchor.getClass());
     }
@@ -180,16 +170,26 @@ public final class ThreadUtils {
         return cls;
     }
 
+    public static ExecutorService newFixedThreadPoolIf(ExecutorService executorService, String poolName, int nThreads) {
+        return executorService == null ? newFixedThreadPool(poolName, nThreads) : executorService;
+    }
+
     public static ExecutorService newFixedThreadPool(String poolName, int nThreads) {
-        return new ThreadPoolExecutor(nThreads, nThreads,
+        return new ThreadPoolExecutor(
+                nThreads, nThreads,
                 0L, TimeUnit.MILLISECONDS, // TODO make this configurable
                 new LinkedBlockingQueue<>(),
                 new SshdThreadFactory(poolName),
                 new ThreadPoolExecutor.CallerRunsPolicy());
     }
 
+    public static ExecutorService newCachedThreadPoolIf(ExecutorService executorService, String poolName) {
+        return executorService == null ? newCachedThreadPool(poolName) : executorService;
+    }
+
     public static ExecutorService newCachedThreadPool(String poolName) {
-        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // TODO make this configurable
+        return new ThreadPoolExecutor(
+                0, Integer.MAX_VALUE, // TODO make this configurable
                 60L, TimeUnit.SECONDS, // TODO make this configurable
                 new SynchronousQueue<>(),
                 new SshdThreadFactory(poolName),
@@ -248,4 +248,218 @@ public final class ThreadUtils {
             return t;
         }
     }
+
+    public static class NoCloseExecutor implements ExecutorService {
+
+        protected final ExecutorService executor;
+        protected final CloseFuture closeFuture;
+
+        public NoCloseExecutor(ExecutorService executor) {
+            this.executor = executor;
+            closeFuture = new DefaultCloseFuture(null, null);
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            return executor.submit(task);
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            return executor.submit(task, result);
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            return executor.submit(task);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+            return executor.invokeAll(tasks);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+            return executor.invokeAll(tasks, timeout, unit);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+            return executor.invokeAny(tasks);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            return executor.invokeAny(tasks, timeout, unit);
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            executor.execute(command);
+        }
+
+        @Override
+        public void shutdown() {
+            close(true);
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            close(true);
+            return Collections.emptyList();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return isClosed();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return isClosed();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            try {
+                return closeFuture.await(timeout, unit);
+            } catch (IOException e) {
+                throw (InterruptedException) new InterruptedException().initCause(e);
+            }
+        }
+
+        @Override
+        public CloseFuture close(boolean immediately) {
+            closeFuture.setClosed();
+            return closeFuture;
+        }
+
+        @Override
+        public void addCloseFutureListener(SshFutureListener<CloseFuture> listener) {
+            closeFuture.addListener(listener);
+        }
+
+        @Override
+        public void removeCloseFutureListener(SshFutureListener<CloseFuture> listener) {
+            closeFuture.removeListener(listener);
+        }
+
+        @Override
+        public boolean isClosed() {
+            return closeFuture.isClosed();
+        }
+
+        @Override
+        public boolean isClosing() {
+            return isClosed();
+        }
+
+    }
+
+    public static class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor implements ExecutorService {
+
+        final DelegateCloseable closeable = new DelegateCloseable();
+
+        class DelegateCloseable extends AbstractCloseable {
+            DelegateCloseable() {
+            }
+
+            @Override
+            protected CloseFuture doCloseGracefully() {
+                shutdown();
+                return closeFuture;
+            }
+
+            @Override
+            protected void doCloseImmediately() {
+                shutdownNow();
+                super.doCloseImmediately();
+            }
+
+            void setClosed() {
+                closeFuture.setClosed();
+            }
+        }
+
+        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+        }
+
+        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+        }
+
+        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
+        }
+
+        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
+                                  long keepAliveTime, TimeUnit unit,
+                                  BlockingQueue<Runnable> workQueue,
+                                  ThreadFactory threadFactory,
+                                  RejectedExecutionHandler handler) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
+        }
+
+        @Override
+        protected void terminated() {
+            closeable.doCloseImmediately();
+        }
+
+        @Override
+        public void shutdown() {
+            super.shutdown();
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return super.shutdownNow();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return super.isShutdown();
+        }
+
+        @Override
+        public boolean isTerminating() {
+            return super.isTerminating();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return super.isTerminated();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            return super.awaitTermination(timeout, unit);
+        }
+
+        @Override
+        public CloseFuture close(boolean immediately) {
+            return closeable.close(immediately);
+        }
+
+        @Override
+        public void addCloseFutureListener(SshFutureListener<CloseFuture> listener) {
+            closeable.addCloseFutureListener(listener);
+        }
+
+        @Override
+        public void removeCloseFutureListener(SshFutureListener<CloseFuture> listener) {
+            closeable.removeCloseFutureListener(listener);
+        }
+
+        @Override
+        public boolean isClosed() {
+            return closeable.isClosed();
+        }
+
+        @Override
+        public boolean isClosing() {
+            return closeable.isClosing();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
index 2f3a766..b9da5a7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
@@ -35,6 +35,7 @@ import org.apache.sshd.common.channel.Window;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ExecutorService;
 import org.apache.sshd.server.session.ServerSession;
 
 /**
@@ -46,17 +47,19 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
 
     protected final AtomicBoolean exitStatusSent = new AtomicBoolean(false);
 
-    protected AbstractServerChannel() {
-        this(Collections.emptyList());
+    protected AbstractServerChannel(ExecutorService executor) {
+        super("", false, Collections.emptyList(), executor);
     }
 
-    protected AbstractServerChannel(Collection<? extends RequestHandler<Channel>> handlers) {
-        this("", handlers);
+    protected AbstractServerChannel(String discriminator, Collection<? extends RequestHandler<Channel>> handlers, ExecutorService executor) {
+        super(discriminator, false, handlers, executor);
     }
 
-    protected AbstractServerChannel(String discriminator, Collection<? extends RequestHandler<Channel>> handlers) {
-        super(discriminator, false, handlers);
-    }
+// TODO: investigate how to fix the forwarding channel failures when enabled
+//    @Override
+//    public ServerSession getSession() {
+//        return (ServerSession) super.getSession();
+//    }
 
     @Override
     public ServerSession getServerSession() {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index cfaa499..78f5861 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -77,7 +77,7 @@ import org.apache.sshd.server.session.ServerSession;
 import org.apache.sshd.server.x11.X11ForwardSupport;
 
 /**
- * TODO Add javadoc
+ * TODO Add javadocWindowInitTest
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
@@ -102,12 +102,17 @@ public class ChannelSession extends AbstractServerChannel {
     }
 
     public ChannelSession(Collection<? extends RequestHandler<Channel>> handlers) {
-        super(handlers);
+        super("", handlers, null);
 
         commandExitFuture = new DefaultCloseFuture(getClass().getSimpleName(), lock);
     }
 
     @Override
+    public ServerSession getSession() {
+        return (ServerSession) super.getSession();
+    }
+
+    @Override
     public void handleWindowAdjust(Buffer buffer) throws IOException {
         super.handleWindowAdjust(buffer);
         if (asyncOut != null) {