You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2018/07/25 12:58:15 UTC
[2/3] mina-sshd git commit: [SSHD-835] Introduce a Closeable
ExecutorService
[SSHD-835] Introduce a Closeable ExecutorService
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/7b35bb36
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/7b35bb36
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/7b35bb36
Branch: refs/heads/master
Commit: 7b35bb360410f21be06ad16a7d0a0a0d7cc012af
Parents: 101a509
Author: Guillaume Nodet <gn...@apache.org>
Authored: Thu May 31 09:51:49 2018 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Wed Jul 25 14:37:52 2018 +0200
----------------------------------------------------------------------
.../sshd/agent/common/AbstractAgentProxy.java | 27 +-
.../sshd/agent/local/AgentForwardedChannel.java | 2 +-
.../agent/local/ChannelAgentForwarding.java | 6 +-
.../local/ChannelAgentForwardingFactory.java | 2 +-
.../org/apache/sshd/agent/unix/AgentClient.java | 12 +-
.../org/apache/sshd/agent/unix/AgentServer.java | 22 +-
.../sshd/agent/unix/AgentServerProxy.java | 23 +-
.../sshd/agent/unix/ChannelAgentForwarding.java | 16 +-
.../unix/ChannelAgentForwardingFactory.java | 29 +-
.../sshd/agent/unix/UnixAgentFactory.java | 59 ++--
.../client/channel/AbstractClientChannel.java | 6 +
.../sshd/client/channel/ChannelSession.java | 10 +-
.../client/session/AbstractClientSession.java | 4 +-
.../client/session/ClientConnectionService.java | 9 +-
.../sshd/common/channel/AbstractChannel.java | 31 +--
.../common/io/AbstractIoServiceFactory.java | 18 +-
.../io/AbstractIoServiceFactoryFactory.java | 45 +--
.../io/DefaultIoServiceFactoryFactory.java | 17 +-
.../sshd/common/io/IoServiceFactoryFactory.java | 5 +-
.../sshd/common/io/nio2/Nio2ServiceFactory.java | 19 +-
.../io/nio2/Nio2ServiceFactoryFactory.java | 14 +-
.../helpers/AbstractConnectionService.java | 38 ++-
.../common/util/threads/ExecutorService.java | 26 ++
.../util/threads/ExecutorServiceCarrier.java | 8 -
.../util/threads/ExecutorServiceConfigurer.java | 31 ---
.../sshd/common/util/threads/ThreadUtils.java | 276 ++++++++++++++++---
.../server/channel/AbstractServerChannel.java | 17 +-
.../sshd/server/channel/ChannelSession.java | 9 +-
.../server/command/AbstractCommandSupport.java | 14 +-
.../command/AbstractFileSystemCommand.java | 6 +-
.../sshd/server/forward/DirectTcpipFactory.java | 1 +
.../server/forward/ForwardedTcpipFactory.java | 1 +
.../sshd/server/forward/TcpipServerChannel.java | 43 +--
.../server/session/ServerConnectionService.java | 8 +-
.../forward/AbstractServerCloseTestSupport.java | 15 +-
.../io/DefaultIoServiceFactoryFactoryTest.java | 22 +-
.../sshd/common/util/ThreadUtilsTest.java | 5 +-
.../java/org/apache/sshd/server/ServerTest.java | 2 +-
.../sshd/util/test/CommandExecutionHelper.java | 2 +-
.../org/apache/sshd/git/AbstractGitCommand.java | 6 +-
.../sshd/git/AbstractGitCommandFactory.java | 14 +-
.../apache/sshd/git/pack/GitPackCommand.java | 8 +-
.../sshd/git/pack/GitPackCommandFactory.java | 10 +-
.../org/apache/sshd/git/pgm/GitPgmCommand.java | 8 +-
.../sshd/git/pgm/GitPgmCommandFactory.java | 10 +-
.../sshd/common/io/mina/MinaServiceFactory.java | 9 +-
.../io/mina/MinaServiceFactoryFactory.java | 21 +-
.../netty/NettyIoServiceFactoryFactory.java | 5 +-
.../sshd/client/scp/AbstractScpClient.java | 4 +
.../org/apache/sshd/server/scp/ScpCommand.java | 9 +-
.../sshd/server/scp/ScpCommandFactory.java | 26 +-
.../org/apache/sshd/client/scp/ScpTest.java | 8 +-
.../sshd/server/scp/ScpCommandFactoryTest.java | 6 +-
.../server/subsystem/sftp/SftpSubsystem.java | 17 +-
.../subsystem/sftp/SftpSubsystemFactory.java | 33 +--
.../client/subsystem/sftp/SftpVersionsTest.java | 4 +-
.../SpaceAvailableExtensionImplTest.java | 2 +-
.../openssh/helpers/OpenSSHExtensionsTest.java | 2 +-
.../sftp/SftpSubsystemFactoryTest.java | 6 +-
59 files changed, 556 insertions(+), 552 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java b/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java
index 028b48e..0432eec 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/common/AbstractAgentProxy.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import org.apache.sshd.agent.SshAgent;
import org.apache.sshd.agent.SshAgentConstants;
@@ -38,19 +37,20 @@ import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.BufferUtils;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
-import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
+import org.apache.sshd.common.util.threads.ExecutorService;
+import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
/**
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public abstract class AbstractAgentProxy extends AbstractLoggingBean implements SshAgent, ExecutorServiceConfigurer {
+public abstract class AbstractAgentProxy extends AbstractLoggingBean implements SshAgent, ExecutorServiceCarrier {
private ExecutorService executor;
- private boolean shutdownExecutor;
private String channelType = FactoryManager.AGENT_FORWARDING_TYPE_OPENSSH;
- protected AbstractAgentProxy() {
+ protected AbstractAgentProxy(ExecutorService executorService) {
super();
+ executor = executorService;
}
public String getChannelType() {
@@ -67,21 +67,6 @@ public abstract class AbstractAgentProxy extends AbstractLoggingBean implements
}
@Override
- public void setExecutorService(ExecutorService service) {
- executor = service;
- }
-
- @Override
- public boolean isShutdownOnExit() {
- return shutdownExecutor;
- }
-
- @Override
- public void setShutdownOnExit(boolean shutdown) {
- shutdownExecutor = shutdown;
- }
-
- @Override
public List<? extends Map.Entry<PublicKey, String>> getIdentities() throws IOException {
int cmd = SshAgentConstants.SSH2_AGENTC_REQUEST_IDENTITIES;
int okcmd = SshAgentConstants.SSH2_AGENT_IDENTITIES_ANSWER;
@@ -213,7 +198,7 @@ public abstract class AbstractAgentProxy extends AbstractLoggingBean implements
@Override
public void close() throws IOException {
ExecutorService service = getExecutorService();
- if ((service != null) && isShutdownOnExit() && (!service.isShutdown())) {
+ if ((service != null) && (!service.isShutdown())) {
Collection<?> runners = service.shutdownNow();
if (log.isDebugEnabled()) {
log.debug("close() - shutdown runners count=" + GenericUtils.size(runners));
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
index 9306bc8..7657937 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
@@ -46,7 +46,7 @@ public class AgentForwardedChannel extends AbstractClientChannel {
}
public SshAgent getAgent() {
- AbstractAgentProxy rtn = new AbstractAgentProxy() {
+ AbstractAgentProxy rtn = new AbstractAgentProxy(null) {
private final AtomicBoolean open = new AtomicBoolean(true);
@Override
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
index a1868c7..e051f1e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
@@ -20,6 +20,7 @@ package org.apache.sshd.agent.local;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.Collections;
import java.util.Objects;
import org.apache.sshd.agent.SshAgent;
@@ -36,6 +37,7 @@ import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.threads.ExecutorService;
import org.apache.sshd.server.channel.AbstractServerChannel;
/**
@@ -46,8 +48,8 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
private SshAgent agent;
private AgentClient client;
- public ChannelAgentForwarding() {
- super();
+ public ChannelAgentForwarding(ExecutorService executor) {
+ super("", Collections.emptyList(), executor);
}
@Override
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java
index 1396b61..40f9135 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwardingFactory.java
@@ -44,6 +44,6 @@ public class ChannelAgentForwardingFactory implements ChannelFactory {
@Override
public Channel create() {
- return new ChannelAgentForwarding();
+ return new ChannelAgentForwarding(null);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
index 97acd90..7f80a89 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,6 +29,7 @@ import org.apache.sshd.agent.common.AbstractAgentProxy;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.threads.ExecutorService;
import org.apache.sshd.common.util.threads.ThreadUtils;
import org.apache.tomcat.jni.Local;
import org.apache.tomcat.jni.Pool;
@@ -50,15 +50,13 @@ public class AgentClient extends AbstractAgentProxy implements Runnable {
private final AtomicBoolean open = new AtomicBoolean(true);
public AgentClient(String authSocket) throws IOException {
- this(authSocket, null, false);
+ this(authSocket, null);
}
- public AgentClient(String authSocket, ExecutorService executor, boolean shutdownOnExit) throws IOException {
+ public AgentClient(String authSocket, ExecutorService executor) throws IOException {
+ super((executor == null) ? ThreadUtils.newSingleThreadExecutor("AgentClient[" + authSocket + "]") : executor);
this.authSocket = authSocket;
- setExecutorService((executor == null) ? ThreadUtils.newSingleThreadExecutor("AgentClient[" + authSocket + "]") : executor);
- setShutdownOnExit((executor == null) || shutdownOnExit);
-
try {
pool = Pool.create(AprLibrary.getInstance().getRootPool());
handle = Local.create(authSocket, pool);
@@ -141,7 +139,7 @@ public class AgentClient extends AbstractAgentProxy implements Runnable {
Socket.close(handle);
}
- if ((pumper != null) && isShutdownOnExit() && (!pumper.isDone())) {
+ if ((pumper != null) && (!pumper.isDone())) {
pumper.cancel(true);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java
index 113ea23..6c6e13a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServer.java
@@ -21,7 +21,6 @@ package org.apache.sshd.agent.unix;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.sshd.agent.SshAgent;
@@ -31,6 +30,7 @@ import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
+import org.apache.sshd.common.util.threads.ExecutorService;
import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
import org.apache.sshd.common.util.threads.ThreadUtils;
import org.apache.tomcat.jni.Local;
@@ -46,24 +46,23 @@ public class AgentServer extends AbstractLoggingBean implements Closeable, Execu
private final SshAgent agent;
private final ExecutorService service;
- private final boolean shutdownExecutor;
private Future<?> agentThread;
private String authSocket;
private long pool;
private long handle;
public AgentServer() {
- this(null, false);
+ this(null);
}
- public AgentServer(ExecutorService executor, boolean shutdownOnExit) {
- this(new AgentImpl(), executor, shutdownOnExit);
+ public AgentServer(ExecutorService executor) {
+ this(new AgentImpl(), executor);
}
- public AgentServer(SshAgent agent, ExecutorService executor, boolean shutdownOnExit) {
+ public AgentServer(SshAgent agent, ExecutorService executor) {
this.agent = agent;
- this.service = (executor == null) ? ThreadUtils.newSingleThreadExecutor("AgentServer[" + agent + "]") : executor;
- this.shutdownExecutor = service != executor || shutdownOnExit;
+ this.service = (executor == null)
+ ? ThreadUtils.newSingleThreadExecutor("AgentServer[" + agent + "]") : executor;
}
public SshAgent getAgent() {
@@ -75,11 +74,6 @@ public class AgentServer extends AbstractLoggingBean implements Closeable, Execu
return service;
}
- @Override
- public boolean isShutdownOnExit() {
- return shutdownExecutor;
- }
-
public String start() throws Exception {
authSocket = AprLibrary.createLocalSocketAddress();
pool = Pool.create(AprLibrary.getInstance().getRootPool());
@@ -129,7 +123,7 @@ public class AgentServer extends AbstractLoggingBean implements Closeable, Execu
}
ExecutorService executor = getExecutorService();
- if ((executor != null) && isShutdownOnExit() && (!executor.isShutdown())) {
+ if ((executor != null) && (!executor.isShutdown())) {
Collection<?> runners = executor.shutdownNow();
if (log.isDebugEnabled()) {
log.debug("Shut down runners count=" + GenericUtils.size(runners));
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java
index 5b66e57..4401a0d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentServerProxy.java
@@ -22,7 +22,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -33,7 +32,7 @@ import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.OsUtils;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
-import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
+import org.apache.sshd.common.util.threads.ExecutorService;
import org.apache.sshd.common.util.threads.ThreadUtils;
import org.apache.tomcat.jni.Local;
import org.apache.tomcat.jni.Pool;
@@ -43,7 +42,7 @@ import org.apache.tomcat.jni.Status;
/**
* The server side fake agent, acting as an agent, but actually forwarding the requests to the auth channel on the client side.
*/
-public class AgentServerProxy extends AbstractLoggingBean implements SshAgentServer, ExecutorServiceCarrier {
+public class AgentServerProxy extends AbstractLoggingBean implements SshAgentServer {
/**
* Property that can be set on the {@link Session} in order to control
* the authentication timeout (millis). If not specified then
@@ -61,15 +60,14 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer
private final long handle;
private Future<?> piper;
private final ExecutorService pipeService;
- private final boolean pipeCloseOnExit;
private final AtomicBoolean open = new AtomicBoolean(true);
private final AtomicBoolean innerFinished = new AtomicBoolean(false);
public AgentServerProxy(ConnectionService service) throws IOException {
- this(service, null, false);
+ this(service, null);
}
- public AgentServerProxy(ConnectionService service, ExecutorService executor, boolean shutdownOnExit) throws IOException {
+ public AgentServerProxy(ConnectionService service, ExecutorService executor) throws IOException {
this.service = service;
try {
String authSocket = AprLibrary.createLocalSocketAddress();
@@ -89,8 +87,9 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer
throw toIOException(result);
}
- pipeService = (executor == null) ? ThreadUtils.newSingleThreadExecutor("sshd-AgentServerProxy-PIPE-" + authSocket) : executor;
- pipeCloseOnExit = executor != pipeService || shutdownOnExit;
+ pipeService = (executor == null)
+ ? ThreadUtils.newSingleThreadExecutor("sshd-AgentServerProxy-PIPE-" + authSocket)
+ : ThreadUtils.noClose(executor);
piper = pipeService.submit(() -> {
try {
boolean debugEnabled = log.isDebugEnabled();
@@ -134,17 +133,11 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer
return open.get();
}
- @Override
public ExecutorService getExecutorService() {
return pipeService;
}
@Override
- public boolean isShutdownOnExit() {
- return pipeCloseOnExit;
- }
-
- @Override
public String getId() {
return authSocket;
}
@@ -194,7 +187,7 @@ public class AgentServerProxy extends AbstractLoggingBean implements SshAgentSer
}
ExecutorService executor = getExecutorService();
- if ((executor != null) && isShutdownOnExit() && (!executor.isShutdown())) {
+ if ((executor != null) && (!executor.isShutdown())) {
Collection<?> runners = executor.shutdownNow();
if (debugEnabled) {
log.debug("Shut down runners count=" + GenericUtils.size(runners));
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
index 395bd37..586faae 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
@@ -21,7 +21,7 @@ package org.apache.sshd.agent.unix;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
-import java.util.concurrent.ExecutorService;
+import java.util.Collections;
import java.util.concurrent.Future;
import org.apache.sshd.agent.SshAgent;
@@ -33,6 +33,7 @@ import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ExecutorService;
import org.apache.sshd.common.util.threads.ThreadUtils;
import org.apache.sshd.server.channel.AbstractServerChannel;
import org.apache.tomcat.jni.Local;
@@ -63,10 +64,9 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
private OutputStream out;
private ExecutorService forwardService;
private Future<?> forwarder;
- private boolean shutdownForwarder;
- public ChannelAgentForwarding() {
- super();
+ public ChannelAgentForwarding(ExecutorService executor) {
+ super("", Collections.emptyList(), executor);
}
@Override
@@ -83,8 +83,9 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
}
ExecutorService service = getExecutorService();
- forwardService = (service == null) ? ThreadUtils.newSingleThreadExecutor("ChannelAgentForwarding[" + authSocket + "]") : service;
- shutdownForwarder = service != forwardService || isShutdownOnExit();
+ forwardService = (service == null)
+ ? ThreadUtils.newSingleThreadExecutor("ChannelAgentForwarding[" + authSocket + "]")
+ : ThreadUtils.noClose(service);
final int copyBufSize = this.getIntProperty(FORWARDER_BUFFER_SIZE, DEFAULT_FORWARDER_BUF_SIZE);
ValidateUtils.checkTrue(copyBufSize >= MIN_FORWARDER_BUF_SIZE, "Copy buf size below min.: %d", copyBufSize);
@@ -136,7 +137,7 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
}
try {
- if ((forwardService != null) && shutdownForwarder) {
+ if (forwardService != null) {
Collection<?> runners = forwardService.shutdownNow();
if (log.isDebugEnabled()) {
log.debug("Shut down runners count=" + GenericUtils.size(runners));
@@ -144,7 +145,6 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
}
} finally {
forwardService = null;
- shutdownForwarder = false;
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java
index 4f31f41..e4d37d8 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwardingFactory.java
@@ -18,25 +18,31 @@
*/
package org.apache.sshd.agent.unix;
-import java.util.concurrent.ExecutorService;
-
+import org.apache.sshd.common.Factory;
import org.apache.sshd.common.channel.Channel;
import org.apache.sshd.common.channel.ChannelFactory;
import org.apache.sshd.common.util.ValidateUtils;
-import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
+import org.apache.sshd.common.util.threads.ExecutorService;
/**
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public class ChannelAgentForwardingFactory implements ChannelFactory, ExecutorServiceCarrier {
+public class ChannelAgentForwardingFactory implements ChannelFactory {
+
public static final ChannelAgentForwardingFactory OPENSSH = new ChannelAgentForwardingFactory("auth-agent@openssh.com");
// see https://tools.ietf.org/html/draft-ietf-secsh-agent-02
public static final ChannelAgentForwardingFactory IETF = new ChannelAgentForwardingFactory("auth-agent");
private final String name;
+ private final Factory<ExecutorService> executorServiceFactory;
public ChannelAgentForwardingFactory(String name) {
+ this(name, null);
+ }
+
+ public ChannelAgentForwardingFactory(String name, Factory<ExecutorService> executorServiceFactory) {
this.name = ValidateUtils.checkNotNullAndNotEmpty(name, "No channel factory name specified");
+ this.executorServiceFactory = executorServiceFactory;
}
@Override
@@ -44,21 +50,10 @@ public class ChannelAgentForwardingFactory implements ChannelFactory, ExecutorSe
return name;
}
- @Override // user can override to provide an alternative
- public ExecutorService getExecutorService() {
- return null;
- }
-
- @Override
- public boolean isShutdownOnExit() {
- return false;
- }
-
@Override
public Channel create() {
- ChannelAgentForwarding channel = new ChannelAgentForwarding();
- channel.setExecutorService(getExecutorService());
- channel.setShutdownOnExit(isShutdownOnExit());
+ ExecutorService executorService = executorServiceFactory != null ? executorServiceFactory.create() : null;
+ ChannelAgentForwarding channel = new ChannelAgentForwarding(executorService);
return channel;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java
index 128214b..02c4f62 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/UnixAgentFactory.java
@@ -23,12 +23,12 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.sshd.agent.SshAgent;
import org.apache.sshd.agent.SshAgentFactory;
import org.apache.sshd.agent.SshAgentServer;
+import org.apache.sshd.common.Factory;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.common.SshException;
@@ -37,65 +37,40 @@ import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.ValidateUtils;
-import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
+import org.apache.sshd.common.util.threads.ExecutorService;
import org.apache.sshd.server.session.ServerSession;
/**
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public class UnixAgentFactory implements SshAgentFactory, ExecutorServiceConfigurer {
+public class UnixAgentFactory implements SshAgentFactory {
public static final List<NamedFactory<Channel>> DEFAULT_FORWARDING_CHANNELS =
Collections.unmodifiableList(
Arrays.<NamedFactory<Channel>>asList(ChannelAgentForwardingFactory.OPENSSH, ChannelAgentForwardingFactory.IETF));
- private ExecutorService executor;
- private boolean shutdownExecutor;
+ private Factory<ExecutorService> executorServiceFactory;
public UnixAgentFactory() {
super();
}
- public UnixAgentFactory(ExecutorService service, boolean shutdown) {
- executor = service;
- shutdownExecutor = shutdown;
+ public UnixAgentFactory(Factory<ExecutorService> factory) {
+ executorServiceFactory = factory;
}
- @Override
- public ExecutorService getExecutorService() {
- return executor;
- }
-
- @Override
- public void setExecutorService(ExecutorService service) {
- executor = service;
- }
-
- @Override
- public boolean isShutdownOnExit() {
- return shutdownExecutor;
- }
-
- @Override
- public void setShutdownOnExit(boolean shutdown) {
- shutdownExecutor = shutdown;
+ protected ExecutorService newExecutor() {
+ return executorServiceFactory != null ? executorServiceFactory.create() : null;
}
@Override
public List<NamedFactory<Channel>> getChannelForwardingFactories(FactoryManager manager) {
- final ExecutorServiceConfigurer configurer = this;
- return Collections.unmodifiableList(DEFAULT_FORWARDING_CHANNELS.stream()
- .map(cf -> new ChannelAgentForwardingFactory(cf.getName()) {
- @Override
- public ExecutorService getExecutorService() {
- return configurer.getExecutorService();
- }
-
- @Override
- public boolean isShutdownOnExit() {
- return configurer.isShutdownOnExit();
- }
- })
- .collect(Collectors.toList()));
+ if (executorServiceFactory != null) {
+ return DEFAULT_FORWARDING_CHANNELS.stream()
+ .map(cf -> new ChannelAgentForwardingFactory(cf.getName(), executorServiceFactory))
+ .collect(Collectors.toList());
+ } else {
+ return DEFAULT_FORWARDING_CHANNELS;
+ }
}
@Override
@@ -105,7 +80,7 @@ public class UnixAgentFactory implements SshAgentFactory, ExecutorServiceConfigu
throw new SshException("No " + SshAgent.SSH_AUTHSOCKET_ENV_NAME + " value");
}
- return new AgentClient(authSocket, getExecutorService(), isShutdownOnExit());
+ return new AgentClient(authSocket, newExecutor());
}
@Override
@@ -113,6 +88,6 @@ public class UnixAgentFactory implements SshAgentFactory, ExecutorServiceConfigu
Session session = Objects.requireNonNull(service.getSession(), "No session");
ValidateUtils.checkInstanceOf(session, ServerSession.class,
"The session used to create an agent server proxy must be a server session: %s", session);
- return new AgentServerProxy(service, getExecutorService(), isShutdownOnExit());
+ return new AgentServerProxy(service, newExecutor());
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 78a6e12..1ed4518 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -101,6 +101,12 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
});
}
+// TODO: investigate how to fix the forwarding channel failures when enabled
+// @Override
+// public ClientSession getSession() {
+// return (ClientSession) super.getSession();
+// }
+
protected void addChannelSignalRequestHandlers(EventNotifier<String> notifier) {
addRequestHandler(new ExitStatusChannelRequestHandler(exitStatusHolder, notifier));
addRequestHandler(new ExitSignalChannelRequestHandler(exitSignalHolder, notifier));
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index 459a659..b3f8c67 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -20,7 +20,6 @@ package org.apache.sshd.client.channel;
import java.io.IOException;
import java.io.InputStream;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.sshd.common.SshConstants;
@@ -35,6 +34,7 @@ import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ExecutorService;
import org.apache.sshd.common.util.threads.ThreadUtils;
/**
@@ -46,7 +46,6 @@ public class ChannelSession extends AbstractClientChannel {
private ExecutorService pumperService;
private Future<?> pumper;
- private boolean shutdownPumper;
public ChannelSession() {
super("session");
@@ -93,12 +92,9 @@ public class ChannelSession extends AbstractClientChannel {
if (service == null) {
pumperService = ThreadUtils.newSingleThreadExecutor("ClientInputStreamPump[" + this.toString() + "]");
} else {
- pumperService = service;
+ pumperService = ThreadUtils.noClose(service);
}
- // shutdown the temporary executor service if had to create it
- shutdownPumper = (pumperService != service) || isShutdownOnExit();
-
// Interrupt does not really work and the thread will only exit when
// the call to read() will return. So ensure this thread is a daemon
// to avoid blocking the whole app
@@ -130,7 +126,7 @@ public class ChannelSession extends AbstractClientChannel {
@Override
protected void doCloseImmediately() {
- if ((pumper != null) && (pumperService != null) && shutdownPumper && (!pumperService.isShutdown())) {
+ if ((pumper != null) && (pumperService != null) && (!pumperService.isShutdown())) {
try {
if (!pumper.isDone()) {
pumper.cancel(true);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java b/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java
index 0aa5039..5ec3a69 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/AbstractClientSession.java
@@ -443,8 +443,8 @@ public abstract class AbstractClientSession extends AbstractSession implements C
@Override
public KeyExchangeFuture switchToNoneCipher() throws IOException {
- if (!(currentService instanceof AbstractConnectionService<?>)
- || !GenericUtils.isEmpty(((AbstractConnectionService<?>) currentService).getChannels())) {
+ if (!(currentService instanceof AbstractConnectionService)
+ || !GenericUtils.isEmpty(((AbstractConnectionService) currentService).getChannels())) {
throw new IllegalStateException("The switch to the none cipher must be done immediately after authentication");
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
index 1743a8e..497e61e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
@@ -39,7 +39,9 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public class ClientConnectionService extends AbstractConnectionService<AbstractClientSession> implements ClientSessionHolder {
+public class ClientConnectionService
+ extends AbstractConnectionService
+ implements ClientSessionHolder {
private ScheduledFuture<?> heartBeat;
@@ -53,6 +55,11 @@ public class ClientConnectionService extends AbstractConnectionService<AbstractC
}
@Override
+ public AbstractClientSession getSession() {
+ return (AbstractClientSession) super.getSession();
+ }
+
+ @Override
public void start() {
ClientSession session = getClientSession();
if (!session.isAuthenticated()) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 94e2d20..6d3c78e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -29,7 +29,6 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -59,7 +58,8 @@ import org.apache.sshd.common.util.buffer.BufferUtils;
import org.apache.sshd.common.util.closeable.AbstractInnerCloseable;
import org.apache.sshd.common.util.closeable.IoBaseCloseable;
import org.apache.sshd.common.util.io.IoUtils;
-import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
+import org.apache.sshd.common.util.threads.ExecutorService;
+import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
/**
* Provides common client/server channel functionality
@@ -68,7 +68,7 @@ import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
*/
public abstract class AbstractChannel
extends AbstractInnerCloseable
- implements Channel, ExecutorServiceConfigurer {
+ implements Channel, ExecutorServiceCarrier {
/**
* Default growth factor function used to resize response buffers
@@ -95,7 +95,6 @@ public abstract class AbstractChannel
private int recipient = -1;
private Session sessionInstance;
private ExecutorService executor;
- private boolean shutdownExecutor;
private final List<RequestHandler<Channel>> requestHandlers = new CopyOnWriteArrayList<>();
private final Window localWindow;
@@ -115,19 +114,20 @@ public abstract class AbstractChannel
}
protected AbstractChannel(boolean client, Collection<? extends RequestHandler<Channel>> handlers) {
- this("", client, handlers);
+ this("", client, handlers, null);
}
protected AbstractChannel(String discriminator, boolean client) {
- this(discriminator, client, Collections.emptyList());
+ this(discriminator, client, Collections.emptyList(), null);
}
- protected AbstractChannel(String discriminator, boolean client, Collection<? extends RequestHandler<Channel>> handlers) {
+ protected AbstractChannel(String discriminator, boolean client, Collection<? extends RequestHandler<Channel>> handlers, ExecutorService executorService) {
super(discriminator);
gracefulFuture = new DefaultCloseFuture(discriminator, lock);
localWindow = new Window(this, null, client, true);
remoteWindow = new Window(this, null, client, false);
channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, getClass().getClassLoader(), channelListeners);
+ executor = executorService;
addRequestHandlers(handlers);
}
@@ -189,21 +189,6 @@ public abstract class AbstractChannel
}
@Override
- public void setExecutorService(ExecutorService service) {
- executor = service;
- }
-
- @Override
- public boolean isShutdownOnExit() {
- return shutdownExecutor;
- }
-
- @Override
- public void setShutdownOnExit(boolean shutdown) {
- shutdownExecutor = shutdown;
- }
-
- @Override
public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() {
return channelStreamPacketWriterResolver;
}
@@ -655,7 +640,7 @@ public abstract class AbstractChannel
}
ExecutorService service = getExecutorService();
- if ((service != null) && isShutdownOnExit() && (!service.isShutdown())) {
+ if ((service != null) && (!service.isShutdown())) {
Collection<?> running = service.shutdownNow();
if (debugEnabled) {
log.debug("close({})[immediately={}] shutdown executor service on close - running count={}",
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
index 5167365..243416c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
@@ -19,12 +19,13 @@
package org.apache.sshd.common.io;
-import java.util.concurrent.ExecutorService;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.FactoryManagerHolder;
import org.apache.sshd.common.util.closeable.AbstractCloseable;
+import org.apache.sshd.common.util.threads.ExecutorService;
import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
/**
@@ -36,12 +37,10 @@ public abstract class AbstractIoServiceFactory
private final FactoryManager manager;
private final ExecutorService executor;
- private final boolean shutdownExecutor;
- protected AbstractIoServiceFactory(FactoryManager factoryManager, ExecutorService executorService, boolean shutdownOnExit) {
- manager = factoryManager;
- executor = executorService;
- shutdownExecutor = shutdownOnExit;
+ protected AbstractIoServiceFactory(FactoryManager factoryManager, ExecutorService executorService) {
+ manager = Objects.requireNonNull(factoryManager);
+ executor = Objects.requireNonNull(executorService);
}
@Override
@@ -55,15 +54,10 @@ public abstract class AbstractIoServiceFactory
}
@Override
- public final boolean isShutdownOnExit() {
- return shutdownExecutor;
- }
-
- @Override
protected void doCloseImmediately() {
try {
ExecutorService service = getExecutorService();
- if ((service != null) && isShutdownOnExit() && (!service.isShutdown())) {
+ if ((service != null) && (!service.isShutdown())) {
log.debug("Shutdown executor");
service.shutdownNow();
if (service.awaitTermination(5, TimeUnit.SECONDS)) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java
index 1ea4e6e..91f3942 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactoryFactory.java
@@ -19,54 +19,37 @@
package org.apache.sshd.common.io;
-import java.util.concurrent.ExecutorService;
-
+import org.apache.sshd.common.Factory;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
-import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
+import org.apache.sshd.common.util.threads.ExecutorService;
/**
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public abstract class AbstractIoServiceFactoryFactory
extends AbstractLoggingBean
- implements IoServiceFactoryFactory, ExecutorServiceConfigurer {
+ implements IoServiceFactoryFactory {
- private ExecutorService executorService;
- private boolean shutdownExecutor;
+ private Factory<ExecutorService> executorServiceFactory;
/**
- * @param executors The {@link ExecutorService} to use for spawning threads.
- * If {@code null} then an internal service is allocated - in which case it
- * is automatically shutdown regardless of the value of the <tt>shutdownOnExit</tt>
- * parameter value
- * @param shutdownOnExit If {@code true} then the {@link ExecutorService#shutdownNow()}
- * will be called (unless it is an internally allocated service which is always
- * closed)
+ * @param factory The {@link ExecutorService} factory to use for spawning threads.
+ * If {@code null} then an internal service will be allocated.
*/
- protected AbstractIoServiceFactoryFactory(ExecutorService executors, boolean shutdownOnExit) {
- executorService = executors;
- shutdownExecutor = shutdownOnExit;
- }
-
- @Override
- public ExecutorService getExecutorService() {
- return executorService;
+ protected AbstractIoServiceFactoryFactory(Factory<ExecutorService> factory) {
+ executorServiceFactory = factory;
}
- @Override
- public void setExecutorService(ExecutorService service) {
- executorService = service;
-
+ public Factory<ExecutorService> getExecutorServiceFactory() {
+ return executorServiceFactory;
}
@Override
- public boolean isShutdownOnExit() {
- return shutdownExecutor;
+ public void setExecutorServiceFactory(Factory<ExecutorService> factory) {
+ executorServiceFactory = factory;
}
- @Override
- public void setShutdownOnExit(boolean shutdown) {
- shutdownExecutor = shutdown;
+ protected ExecutorService newExecutor() {
+ return executorServiceFactory != null ? executorServiceFactory.create() : null;
}
-
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java
index 74775ff..f30331a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/DefaultIoServiceFactoryFactory.java
@@ -20,11 +20,11 @@ package org.apache.sshd.common.io;
import java.util.Iterator;
import java.util.ServiceLoader;
-import java.util.concurrent.ExecutorService;
+import org.apache.sshd.common.Factory;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.util.GenericUtils;
-import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
+import org.apache.sshd.common.util.threads.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,11 +37,11 @@ public class DefaultIoServiceFactoryFactory extends AbstractIoServiceFactoryFact
private IoServiceFactoryFactory factory;
protected DefaultIoServiceFactoryFactory() {
- this(null, true);
+ this(null);
}
- protected DefaultIoServiceFactoryFactory(ExecutorService executors, boolean shutdownOnExit) {
- super(executors, shutdownOnExit);
+ protected DefaultIoServiceFactoryFactory(Factory<ExecutorService> factory) {
+ super(factory);
}
@Override
@@ -57,10 +57,9 @@ public class DefaultIoServiceFactoryFactory extends AbstractIoServiceFactoryFact
synchronized (this) {
if (factory == null) {
factory = newInstance(IoServiceFactoryFactory.class);
- if (factory instanceof ExecutorServiceConfigurer) {
- ExecutorServiceConfigurer configurer = (ExecutorServiceConfigurer) factory;
- configurer.setExecutorService(getExecutorService());
- configurer.setShutdownOnExit(isShutdownOnExit());
+ Factory<ExecutorService> executorServiceFactory = getExecutorServiceFactory();
+ if (executorServiceFactory != null) {
+ factory.setExecutorServiceFactory(executorServiceFactory);
}
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java
index 92f1e73..74b2237 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoServiceFactoryFactory.java
@@ -18,13 +18,16 @@
*/
package org.apache.sshd.common.io;
+import org.apache.sshd.common.Factory;
import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.util.threads.ExecutorService;
/**
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-@FunctionalInterface
public interface IoServiceFactoryFactory {
IoServiceFactory create(FactoryManager manager);
+
+ void setExecutorServiceFactory(Factory<ExecutorService> factory);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java
index 1cdb651..12fd522 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactory.java
@@ -20,7 +20,6 @@ package org.apache.sshd.common.io.nio2;
import java.io.IOException;
import java.nio.channels.AsynchronousChannelGroup;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.sshd.common.FactoryManager;
@@ -29,6 +28,7 @@ import org.apache.sshd.common.io.AbstractIoServiceFactory;
import org.apache.sshd.common.io.IoAcceptor;
import org.apache.sshd.common.io.IoConnector;
import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.util.threads.ExecutorService;
import org.apache.sshd.common.util.threads.ThreadUtils;
/**
@@ -38,12 +38,11 @@ public class Nio2ServiceFactory extends AbstractIoServiceFactory {
private final AsynchronousChannelGroup group;
- public Nio2ServiceFactory(FactoryManager factoryManager, ExecutorService service, boolean shutdownOnExit) {
+ public Nio2ServiceFactory(FactoryManager factoryManager, ExecutorService service) {
super(factoryManager,
- service == null ? ThreadUtils.newFixedThreadPool(factoryManager.toString() + "-nio2", getNioWorkers(factoryManager)) : service,
- service == null || shutdownOnExit);
+ ThreadUtils.newFixedThreadPoolIf(service, factoryManager.toString() + "-nio2", getNioWorkers(factoryManager)));
try {
- group = AsynchronousChannelGroup.withThreadPool(ThreadUtils.protectExecutorServiceShutdown(getExecutorService(), isShutdownOnExit()));
+ group = AsynchronousChannelGroup.withThreadPool(ThreadUtils.noClose(getExecutorService()));
} catch (IOException e) {
log.warn("Failed (" + e.getClass().getSimpleName() + " to start async. channel group: " + e.getMessage());
if (log.isDebugEnabled()) {
@@ -71,12 +70,10 @@ public class Nio2ServiceFactory extends AbstractIoServiceFactory {
group.shutdownNow();
// if we protect the executor then the await will fail since we didn't really shut it down...
- if (isShutdownOnExit()) {
- if (group.awaitTermination(5, TimeUnit.SECONDS)) {
- log.debug("Group successfully shut down");
- } else {
- log.debug("Not all group tasks terminated");
- }
+ if (group.awaitTermination(5, TimeUnit.SECONDS)) {
+ log.debug("Group successfully shut down");
+ } else {
+ log.debug("Not all group tasks terminated");
}
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java
index 3f64b81..d99874c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2ServiceFactoryFactory.java
@@ -20,18 +20,19 @@ package org.apache.sshd.common.io.nio2;
import java.nio.channels.AsynchronousChannel;
import java.util.Objects;
-import java.util.concurrent.ExecutorService;
+import org.apache.sshd.common.Factory;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.io.AbstractIoServiceFactoryFactory;
import org.apache.sshd.common.io.IoServiceFactory;
+import org.apache.sshd.common.util.threads.ExecutorService;
/**
*/
public class Nio2ServiceFactoryFactory extends AbstractIoServiceFactoryFactory {
public Nio2ServiceFactoryFactory() {
- this(null, true);
+ this(null);
}
/**
@@ -39,18 +40,15 @@ public class Nio2ServiceFactoryFactory extends AbstractIoServiceFactoryFactory {
* If {@code null} then an internal service is allocated - in which case it
* is automatically shutdown regardless of the value of the <tt>shutdownOnExit</tt>
* parameter value
- * @param shutdownOnExit If {@code true} then the {@link ExecutorService#shutdownNow()}
- * will be called (unless it is an internally allocated service which is always
- * closed)
*/
- public Nio2ServiceFactoryFactory(ExecutorService executors, boolean shutdownOnExit) {
- super(executors, shutdownOnExit);
+ public Nio2ServiceFactoryFactory(Factory<ExecutorService> executors) {
+ super(executors);
// Make sure NIO2 is available
Objects.requireNonNull(AsynchronousChannel.class, "Missing NIO2 class");
}
@Override
public IoServiceFactory create(FactoryManager manager) {
- return new Nio2ServiceFactory(manager, getExecutorService(), isShutdownOnExit());
+ return new Nio2ServiceFactory(manager, newExecutor());
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index 78b1257..cd711e0 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -52,7 +52,6 @@ import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
import org.apache.sshd.common.io.AbstractIoWriteFuture;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.session.ConnectionService;
-import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.UnknownChannelReferenceHandler;
import org.apache.sshd.common.util.EventListenerUtils;
import org.apache.sshd.common.util.GenericUtils;
@@ -66,10 +65,9 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
/**
* Base implementation of ConnectionService.
*
- * @param <S> Type of {@link AbstractSession} being used
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public abstract class AbstractConnectionService<S extends AbstractSession>
+public abstract class AbstractConnectionService
extends AbstractInnerCloseable
implements ConnectionService {
/**
@@ -105,10 +103,10 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
private final Collection<PortForwardingEventListener> listeners = new CopyOnWriteArraySet<>();
private final Collection<PortForwardingEventListenerManager> managersHolder = new CopyOnWriteArraySet<>();
private final PortForwardingEventListener listenerProxy;
- private final S sessionInstance;
+ private final AbstractSession sessionInstance;
private UnknownChannelReferenceHandler unknownChannelReferenceHandler;
- protected AbstractConnectionService(S session) {
+ protected AbstractConnectionService(AbstractSession session) {
sessionInstance = Objects.requireNonNull(session, "No session");
listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners);
}
@@ -166,7 +164,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
}
@Override
- public S getSession() {
+ public AbstractSession getSession() {
return sessionInstance;
}
@@ -178,7 +176,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
@Override
public ForwardingFilter getForwardingFilter() {
ForwardingFilter forwarder;
- S session = getSession();
+ AbstractSession session = getSession();
synchronized (forwarderHolder) {
forwarder = forwarderHolder.get();
if (forwarder != null) {
@@ -202,7 +200,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
super.preClose();
}
- protected ForwardingFilter createForwardingFilter(S session) {
+ protected ForwardingFilter createForwardingFilter(AbstractSession session) {
FactoryManager manager =
Objects.requireNonNull(session.getFactoryManager(), "No factory manager");
ForwardingFilterFactory factory =
@@ -215,7 +213,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
@Override
public X11ForwardSupport getX11ForwardSupport() {
X11ForwardSupport x11Support;
- S session = getSession();
+ AbstractSession session = getSession();
synchronized (x11ForwardHolder) {
x11Support = x11ForwardHolder.get();
if (x11Support != null) {
@@ -232,14 +230,14 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
return x11Support;
}
- protected X11ForwardSupport createX11ForwardSupport(S session) {
+ protected X11ForwardSupport createX11ForwardSupport(AbstractSession session) {
return new DefaultX11ForwardSupport(this);
}
@Override
public AgentForwardSupport getAgentForwardSupport() {
AgentForwardSupport agentForward;
- S session = getSession();
+ AbstractSession session = getSession();
synchronized (agentForwardHolder) {
agentForward = agentForwardHolder.get();
if (agentForward != null) {
@@ -257,7 +255,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
return agentForward;
}
- protected AgentForwardSupport createAgentForwardSupport(S session) {
+ protected AgentForwardSupport createAgentForwardSupport(AbstractSession session) {
return new DefaultAgentForwardSupport(this);
}
@@ -275,7 +273,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
@Override
public int registerChannel(Channel channel) throws IOException {
- Session session = getSession();
+ AbstractSession session = getSession();
int maxChannels = session.getIntProperty(MAX_CONCURRENT_CHANNELS_PROP, DEFAULT_MAX_CHANNELS);
int curSize = channels.size();
if (curSize > maxChannels) {
@@ -584,7 +582,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
return handler;
}
- Session s = getSession();
+ AbstractSession s = getSession();
return (s == null) ? null : s.resolveUnknownChannelReferenceHandler();
}
@@ -614,7 +612,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
return;
}
- Session session = getSession();
+ AbstractSession session = getSession();
FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager");
Channel channel = NamedFactory.create(manager.getChannelFactories(), type);
if (channel == null) {
@@ -673,7 +671,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
this, sender, SshConstants.getOpenErrorCodeName(reasonCode), lang, message);
}
- Session session = getSession();
+ AbstractSession session = getSession();
Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE,
Long.SIZE + GenericUtils.length(message) + GenericUtils.length(lang));
buf.putInt(sender);
@@ -701,7 +699,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
this, req, wantReply);
}
- Session session = getSession();
+ AbstractSession session = getSession();
FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager");
Collection<RequestHandler<ConnectionService>> handlers = manager.getGlobalRequestHandlers();
if (GenericUtils.size(handlers) > 0) {
@@ -755,18 +753,18 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
byte cmd = RequestHandler.Result.ReplySuccess.equals(result)
? SshConstants.SSH_MSG_REQUEST_SUCCESS
: SshConstants.SSH_MSG_REQUEST_FAILURE;
- Session session = getSession();
+ AbstractSession session = getSession();
Buffer rsp = session.createBuffer(cmd, 2);
return session.writePacket(rsp);
}
protected void requestSuccess(Buffer buffer) throws Exception {
- S s = getSession();
+ AbstractSession s = (AbstractSession) getSession();
s.requestSuccess(buffer);
}
protected void requestFailure(Buffer buffer) throws Exception {
- S s = getSession();
+ AbstractSession s = (AbstractSession) getSession();
s.requestFailure(buffer);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorService.java b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorService.java
new file mode 100644
index 0000000..fe7ccbe
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorService.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.util.threads;
+
+import org.apache.sshd.common.Closeable;
+
+public interface ExecutorService extends java.util.concurrent.ExecutorService, Closeable {
+
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java
index 71c8c36..7e9378b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceCarrier.java
@@ -19,8 +19,6 @@
package org.apache.sshd.common.util.threads;
-import java.util.concurrent.ExecutorService;
-
/**
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
@@ -30,10 +28,4 @@ public interface ExecutorServiceCarrier {
*/
ExecutorService getExecutorService();
- /**
- * @return If {@code true} then the {@link ExecutorService#shutdownNow()}
- * will be called (unless it is an internally allocated service which is always
- * closed)
- */
- boolean isShutdownOnExit();
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceConfigurer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceConfigurer.java b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceConfigurer.java
deleted file mode 100644
index e8519c8..0000000
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ExecutorServiceConfigurer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sshd.common.util.threads;
-
-import java.util.concurrent.ExecutorService;
-
-/**
- * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
- */
-public interface ExecutorServiceConfigurer extends ExecutorServiceCarrier {
- void setExecutorService(ExecutorService service);
-
- void setShutdownOnExit(boolean shutdown);
-}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
index 803e87e..34a935b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
@@ -18,26 +18,33 @@
*/
package org.apache.sshd.common.util.threads;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
+import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.DefaultCloseFuture;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.util.closeable.AbstractCloseable;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
/**
@@ -66,34 +73,17 @@ public final class ThreadUtils {
* value of the <tt>shutdownOnExit</tt> parameter
*/
public static ExecutorService protectExecutorServiceShutdown(final ExecutorService executorService, boolean shutdownOnExit) {
- if (executorService == null || shutdownOnExit) {
+ if (executorService == null || shutdownOnExit || executorService instanceof NoCloseExecutor) {
return executorService;
} else {
- return (ExecutorService) Proxy.newProxyInstance(
- resolveDefaultClassLoader(executorService),
- new Class<?>[]{ExecutorService.class},
- new InvocationHandler() {
- private final AtomicBoolean stopped = new AtomicBoolean(false);
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- String name = method.getName();
- if ("isShutdown".equals(name)) {
- return stopped.get();
- } else if ("shutdown".equals(name)) {
- stopped.set(true);
- return null; // void...
- } else if ("shutdownNow".equals(name)) {
- stopped.set(true);
- return Collections.emptyList();
- } else {
- return method.invoke(executorService, args);
- }
- }
- });
+ return new NoCloseExecutor(executorService);
}
}
+ public static ExecutorService noClose(ExecutorService executorService) {
+ return protectExecutorServiceShutdown(executorService, false);
+ }
+
public static ClassLoader resolveDefaultClassLoader(Object anchor) {
return resolveDefaultClassLoader(anchor == null ? null : anchor.getClass());
}
@@ -180,16 +170,26 @@ public final class ThreadUtils {
return cls;
}
+ public static ExecutorService newFixedThreadPoolIf(ExecutorService executorService, String poolName, int nThreads) {
+ return executorService == null ? newFixedThreadPool(poolName, nThreads) : executorService;
+ }
+
public static ExecutorService newFixedThreadPool(String poolName, int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
+ return new ThreadPoolExecutor(
+ nThreads, nThreads,
0L, TimeUnit.MILLISECONDS, // TODO make this configurable
new LinkedBlockingQueue<>(),
new SshdThreadFactory(poolName),
new ThreadPoolExecutor.CallerRunsPolicy());
}
+ public static ExecutorService newCachedThreadPoolIf(ExecutorService executorService, String poolName) {
+ return executorService == null ? newCachedThreadPool(poolName) : executorService;
+ }
+
public static ExecutorService newCachedThreadPool(String poolName) {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // TODO make this configurable
+ return new ThreadPoolExecutor(
+ 0, Integer.MAX_VALUE, // TODO make this configurable
60L, TimeUnit.SECONDS, // TODO make this configurable
new SynchronousQueue<>(),
new SshdThreadFactory(poolName),
@@ -248,4 +248,218 @@ public final class ThreadUtils {
return t;
}
}
+
+ public static class NoCloseExecutor implements ExecutorService {
+
+ protected final ExecutorService executor;
+ protected final CloseFuture closeFuture;
+
+ public NoCloseExecutor(ExecutorService executor) {
+ this.executor = executor;
+ closeFuture = new DefaultCloseFuture(null, null);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return executor.submit(task);
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return executor.submit(task, result);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ return executor.submit(task);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ return executor.invokeAll(tasks);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ return executor.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ return executor.invokeAny(tasks);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return executor.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ executor.execute(command);
+ }
+
+ @Override
+ public void shutdown() {
+ close(true);
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ close(true);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return isClosed();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return isClosed();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ try {
+ return closeFuture.await(timeout, unit);
+ } catch (IOException e) {
+ throw (InterruptedException) new InterruptedException().initCause(e);
+ }
+ }
+
+ @Override
+ public CloseFuture close(boolean immediately) {
+ closeFuture.setClosed();
+ return closeFuture;
+ }
+
+ @Override
+ public void addCloseFutureListener(SshFutureListener<CloseFuture> listener) {
+ closeFuture.addListener(listener);
+ }
+
+ @Override
+ public void removeCloseFutureListener(SshFutureListener<CloseFuture> listener) {
+ closeFuture.removeListener(listener);
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closeFuture.isClosed();
+ }
+
+ @Override
+ public boolean isClosing() {
+ return isClosed();
+ }
+
+ }
+
+ public static class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor implements ExecutorService {
+
+ final DelegateCloseable closeable = new DelegateCloseable();
+
+ class DelegateCloseable extends AbstractCloseable {
+ DelegateCloseable() {
+ }
+
+ @Override
+ protected CloseFuture doCloseGracefully() {
+ shutdown();
+ return closeFuture;
+ }
+
+ @Override
+ protected void doCloseImmediately() {
+ shutdownNow();
+ super.doCloseImmediately();
+ }
+
+ void setClosed() {
+ closeFuture.setClosed();
+ }
+ }
+
+ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+ }
+
+ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ }
+
+ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
+ }
+
+ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
+ long keepAliveTime, TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
+ }
+
+ @Override
+ protected void terminated() {
+ closeable.doCloseImmediately();
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return super.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return super.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminating() {
+ return super.isTerminating();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return super.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return super.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public CloseFuture close(boolean immediately) {
+ return closeable.close(immediately);
+ }
+
+ @Override
+ public void addCloseFutureListener(SshFutureListener<CloseFuture> listener) {
+ closeable.addCloseFutureListener(listener);
+ }
+
+ @Override
+ public void removeCloseFutureListener(SshFutureListener<CloseFuture> listener) {
+ closeable.removeCloseFutureListener(listener);
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closeable.isClosed();
+ }
+
+ @Override
+ public boolean isClosing() {
+ return closeable.isClosing();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
index 2f3a766..b9da5a7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
@@ -35,6 +35,7 @@ import org.apache.sshd.common.channel.Window;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ExecutorService;
import org.apache.sshd.server.session.ServerSession;
/**
@@ -46,17 +47,19 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
protected final AtomicBoolean exitStatusSent = new AtomicBoolean(false);
- protected AbstractServerChannel() {
- this(Collections.emptyList());
+ protected AbstractServerChannel(ExecutorService executor) {
+ super("", false, Collections.emptyList(), executor);
}
- protected AbstractServerChannel(Collection<? extends RequestHandler<Channel>> handlers) {
- this("", handlers);
+ protected AbstractServerChannel(String discriminator, Collection<? extends RequestHandler<Channel>> handlers, ExecutorService executor) {
+ super(discriminator, false, handlers, executor);
}
- protected AbstractServerChannel(String discriminator, Collection<? extends RequestHandler<Channel>> handlers) {
- super(discriminator, false, handlers);
- }
+// TODO: investigate how to fix the forwarding channel failures when enabled
+// @Override
+// public ServerSession getSession() {
+// return (ServerSession) super.getSession();
+// }
@Override
public ServerSession getServerSession() {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/7b35bb36/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index cfaa499..78f5861 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -77,7 +77,7 @@ import org.apache.sshd.server.session.ServerSession;
import org.apache.sshd.server.x11.X11ForwardSupport;
/**
- * TODO Add javadoc
+ * TODO Add javadocWindowInitTest
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
@@ -102,12 +102,17 @@ public class ChannelSession extends AbstractServerChannel {
}
public ChannelSession(Collection<? extends RequestHandler<Channel>> handlers) {
- super(handlers);
+ super("", handlers, null);
commandExitFuture = new DefaultCloseFuture(getClass().getSimpleName(), lock);
}
@Override
+ public ServerSession getSession() {
+ return (ServerSession) super.getSession();
+ }
+
+ @Override
public void handleWindowAdjust(Buffer buffer) throws IOException {
super.handleWindowAdjust(buffer);
if (asyncOut != null) {