You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2016/01/28 11:44:58 UTC
mina-sshd git commit: [SSHD-632] Provide more flexibility in
overriding Nio2 related classes - e.g., acceptor, connector, session
Repository: mina-sshd
Updated Branches:
refs/heads/master c644ad82d -> aa0d8d37f
[SSHD-632] Provide more flexibility in overriding Nio2 related classes - e.g., acceptor, connector, session
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/aa0d8d37
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/aa0d8d37
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/aa0d8d37
Branch: refs/heads/master
Commit: aa0d8d37fe12b368fb4110759e4e0dac4e683395
Parents: c644ad8
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Thu Jan 28 12:44:44 2016 +0200
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Thu Jan 28 12:44:44 2016 +0200
----------------------------------------------------------------------
.../sshd/common/FactoryManagerHolder.java | 30 +++
.../common/io/AbstractIoServiceFactory.java | 6 +-
.../sshd/common/io/nio2/Nio2Acceptor.java | 56 +++-
.../sshd/common/io/nio2/Nio2Connector.java | 102 +++++---
.../apache/sshd/common/io/nio2/Nio2Service.java | 26 +-
.../apache/sshd/common/io/nio2/Nio2Session.java | 261 ++++++++++++-------
.../org/apache/sshd/common/session/Session.java | 8 +-
7 files changed, 346 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa0d8d37/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerHolder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerHolder.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerHolder.java
new file mode 100644
index 0000000..810ac09
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManagerHolder.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface FactoryManagerHolder {
+ /**
+ * @return The currently associated {@link FactoryManager}
+ */
+ FactoryManager getFactoryManager();
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa0d8d37/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
index 73a91ca..33cb7ce 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/AbstractIoServiceFactory.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.FactoryManagerHolder;
import org.apache.sshd.common.PropertyResolverUtils;
import org.apache.sshd.common.util.closeable.AbstractCloseable;
import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
@@ -30,7 +31,9 @@ import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
/**
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public abstract class AbstractIoServiceFactory extends AbstractCloseable implements IoServiceFactory, ExecutorServiceCarrier {
+public abstract class AbstractIoServiceFactory
+ extends AbstractCloseable
+ implements IoServiceFactory, FactoryManagerHolder, ExecutorServiceCarrier {
private final FactoryManager manager;
private final ExecutorService executor;
@@ -42,6 +45,7 @@ public abstract class AbstractIoServiceFactory extends AbstractCloseable impleme
shutdownExecutor = shutdownOnExit;
}
+ @Override
public final FactoryManager getFactoryManager() {
return manager;
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa0d8d37/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
index ebc590d..8656c51 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
@@ -24,6 +24,7 @@ import java.net.StandardSocketOptions;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -36,26 +37,29 @@ import org.apache.sshd.common.PropertyResolverUtils;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.io.IoAcceptor;
import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.util.ValidateUtils;
/**
*/
public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
public static final int DEFAULT_BACKLOG = 0;
- private final Map<SocketAddress, AsynchronousServerSocketChannel> channels;
+ protected final Map<SocketAddress, AsynchronousServerSocketChannel> channels = new ConcurrentHashMap<>();
private int backlog = DEFAULT_BACKLOG;
public Nio2Acceptor(FactoryManager manager, IoHandler handler, AsynchronousChannelGroup group) {
super(manager, handler, group);
- channels = new ConcurrentHashMap<SocketAddress, AsynchronousServerSocketChannel>();
backlog = PropertyResolverUtils.getIntProperty(manager, FactoryManager.SOCKET_BACKLOG, DEFAULT_BACKLOG);
}
@Override
public void bind(Collection<? extends SocketAddress> addresses) throws IOException {
+ AsynchronousChannelGroup group = getChannelGroup();
for (SocketAddress address : addresses) {
- log.debug("Binding Nio2Acceptor to address {}", address);
- AsynchronousServerSocketChannel socket = AsynchronousServerSocketChannel.open(group);
+ if (log.isDebugEnabled()) {
+ log.debug("Binding Nio2Acceptor to address {}", address);
+ }
+ AsynchronousServerSocketChannel socket = openAsynchronousServerSocketChannel(address, group);
setOption(socket, FactoryManager.SOCKET_KEEPALIVE, StandardSocketOptions.SO_KEEPALIVE, null);
setOption(socket, FactoryManager.SOCKET_LINGER, StandardSocketOptions.SO_LINGER, null);
setOption(socket, FactoryManager.SOCKET_RCVBUF, StandardSocketOptions.SO_RCVBUF, null);
@@ -65,10 +69,25 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
socket.bind(address, backlog);
SocketAddress local = socket.getLocalAddress();
channels.put(local, socket);
- socket.accept(local, new AcceptCompletionHandler(socket));
+
+ CompletionHandler<AsynchronousSocketChannel, ? super SocketAddress> handler =
+ ValidateUtils.checkNotNull(createSocketCompletionHandler(channels, socket),
+ "No completion handler created for address=%s",
+ address);
+ socket.accept(local, handler);
}
}
+ protected AsynchronousServerSocketChannel openAsynchronousServerSocketChannel(
+ SocketAddress address, AsynchronousChannelGroup group) throws IOException {
+ return AsynchronousServerSocketChannel.open(group);
+ }
+
+ protected CompletionHandler<AsynchronousSocketChannel, ? super SocketAddress> createSocketCompletionHandler(
+ Map<SocketAddress, AsynchronousServerSocketChannel> channelsMap, AsynchronousServerSocketChannel socket) throws IOException {
+ return new AcceptCompletionHandler(socket);
+ }
+
@Override
public void bind(SocketAddress address) throws IOException {
bind(Collections.singleton(address));
@@ -133,15 +152,15 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
super.doCloseImmediately();
}
- class AcceptCompletionHandler extends Nio2CompletionHandler<AsynchronousSocketChannel, SocketAddress> {
- private final AsynchronousServerSocketChannel socket;
+ protected class AcceptCompletionHandler extends Nio2CompletionHandler<AsynchronousSocketChannel, SocketAddress> {
+ protected final AsynchronousServerSocketChannel socket;
AcceptCompletionHandler(AsynchronousServerSocketChannel socket) {
this.socket = socket;
}
- @SuppressWarnings("synthetic-access")
@Override
+ @SuppressWarnings("synthetic-access")
protected void onCompleted(AsynchronousSocketChannel result, SocketAddress address) {
// Verify that the address has not been unbound
if (!channels.containsKey(address)) {
@@ -151,7 +170,8 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
Nio2Session session = null;
try {
// Create a session
- session = new Nio2Session(Nio2Acceptor.this, manager, handler, result);
+ IoHandler handler = getIoHandler();
+ session = ValidateUtils.checkNotNull(createSession(Nio2Acceptor.this, address, result, handler), "No NIO2 session created");
handler.sessionCreated(session);
sessions.put(session.getId(), session);
session.startReading();
@@ -164,9 +184,9 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
session.close();
} catch (Throwable t) {
log.warn("Failed (" + t.getClass().getSimpleName() + ")"
- + " to close accepted connection from " + address
- + ": " + t.getMessage(),
- t);
+ + " to close accepted connection from " + address
+ + ": " + t.getMessage(),
+ t);
}
}
}
@@ -180,12 +200,20 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
}
@SuppressWarnings("synthetic-access")
+ protected Nio2Session createSession(Nio2Acceptor acceptor, SocketAddress address, AsynchronousSocketChannel channel, IoHandler handler) throws Throwable {
+ if (log.isTraceEnabled()) {
+ log.trace("createNio2Session({}) address={}", acceptor, address);
+ }
+ return new Nio2Session(acceptor, getFactoryManager(), handler, channel);
+ }
+
@Override
+ @SuppressWarnings("synthetic-access")
protected void onFailed(final Throwable exc, final SocketAddress address) {
if (channels.containsKey(address) && !disposing.get()) {
log.warn("Caught " + exc.getClass().getSimpleName()
- + " while accepting incoming connection from " + address
- + ": " + exc.getMessage(),
+ + " while accepting incoming connection from " + address
+ + ": " + exc.getMessage(),
exc);
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa0d8d37/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
index 3a2f03e..2c2b6a5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
@@ -30,59 +30,102 @@ import org.apache.sshd.common.io.IoConnectFuture;
import org.apache.sshd.common.io.IoConnector;
import org.apache.sshd.common.io.IoHandler;
import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
/**
*/
public class Nio2Connector extends Nio2Service implements IoConnector {
-
public Nio2Connector(FactoryManager manager, IoHandler handler, AsynchronousChannelGroup group) {
super(manager, handler, group);
}
@Override
public IoConnectFuture connect(SocketAddress address) {
- log.debug("Connecting to {}", address);
- final IoConnectFuture future = new DefaultIoConnectFuture(null);
+ if (log.isDebugEnabled()) {
+ log.debug("Connecting to {}", address);
+ }
+
+ IoConnectFuture future = new DefaultIoConnectFuture(null);
try {
- final AsynchronousSocketChannel socket = AsynchronousSocketChannel.open(group);
+ AsynchronousChannelGroup group = getChannelGroup();
+ AsynchronousSocketChannel socket = openAsynchronousSocketChannel(address, group);
setOption(socket, FactoryManager.SOCKET_KEEPALIVE, StandardSocketOptions.SO_KEEPALIVE, null);
setOption(socket, FactoryManager.SOCKET_LINGER, StandardSocketOptions.SO_LINGER, null);
setOption(socket, FactoryManager.SOCKET_RCVBUF, StandardSocketOptions.SO_RCVBUF, null);
setOption(socket, FactoryManager.SOCKET_REUSEADDR, StandardSocketOptions.SO_REUSEADDR, Boolean.TRUE);
setOption(socket, FactoryManager.SOCKET_SNDBUF, StandardSocketOptions.SO_SNDBUF, null);
setOption(socket, FactoryManager.TCP_NODELAY, StandardSocketOptions.TCP_NODELAY, null);
- socket.connect(address, null, new Nio2CompletionHandler<Void, Object>() {
- @Override
- protected void onCompleted(Void result, Object attachment) {
+
+ Nio2CompletionHandler<Void, Object> completionHandler =
+ ValidateUtils.checkNotNull(createConnectionCompletionHandler(future, socket, getFactoryManager(), getIoHandler()),
+ "No connection completion handler created for %s",
+ address);
+ socket.connect(address, null, completionHandler);
+ } catch (Throwable exc) {
+ Throwable t = GenericUtils.peelException(exc);
+ if (log.isDebugEnabled()) {
+ log.debug("connect({}) failed ({}) to schedule connection: {}",
+ address, t.getClass().getSimpleName(), t.getMessage());
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("connect(" + address + ") connection failure details", t);
+ }
+ future.setException(t);
+ }
+ return future;
+ }
+
+ protected AsynchronousSocketChannel openAsynchronousSocketChannel(
+ SocketAddress address, AsynchronousChannelGroup group) throws IOException {
+ return AsynchronousSocketChannel.open(group);
+ }
+
+ protected Nio2CompletionHandler<Void, Object> createConnectionCompletionHandler(
+ final IoConnectFuture future, final AsynchronousSocketChannel socket, final FactoryManager manager, final IoHandler handler) {
+ return new Nio2CompletionHandler<Void, Object>() {
+ @Override
+ @SuppressWarnings("synthetic-access")
+ protected void onCompleted(Void result, Object attachment) {
+ try {
+ Nio2Session session = createSession(manager, handler, socket);
+ handler.sessionCreated(session);
+ sessions.put(session.getId(), session);
+ future.setSession(session);
+ session.startReading();
+ } catch (Throwable exc) {
+ Throwable t = GenericUtils.peelException(exc);
+ if (log.isDebugEnabled()) {
+ log.debug("onCompleted - failed {} to start session: {}",
+ t.getClass().getSimpleName(), t.getMessage());
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("onCompleted - session creation failure details", t);
+ }
try {
- Nio2Session session = new Nio2Session(Nio2Connector.this, manager, handler, socket);
- handler.sessionCreated(session);
- sessions.put(session.getId(), session);
- future.setSession(session);
- session.startReading();
- } catch (Throwable e) {
- try {
- socket.close();
- } catch (IOException t) {
- // Ignore
+ socket.close();
+ } catch (IOException err) {
+ if (log.isDebugEnabled()) {
+ log.debug("onCompleted - failed {} to close socket: {}", err.getClass().getSimpleName(), err.getMessage());
}
- future.setException(e);
}
+ future.setException(t);
}
+ }
- @Override
- protected void onFailed(final Throwable exc, final Object attachment) {
- future.setException(exc);
- }
- });
- } catch (IOException exc) {
- future.setException(exc);
- }
- return future;
+ @Override
+ protected void onFailed(final Throwable exc, final Object attachment) {
+ future.setException(exc);
+ }
+ };
+ }
+
+ protected Nio2Session createSession(FactoryManager manager, IoHandler handler, AsynchronousSocketChannel socket) throws Throwable {
+ return new Nio2Session(this, manager, handler, socket);
}
- static class DefaultIoConnectFuture extends DefaultSshFuture<IoConnectFuture> implements IoConnectFuture {
- DefaultIoConnectFuture(Object lock) {
+ public static class DefaultIoConnectFuture extends DefaultSshFuture<IoConnectFuture> implements IoConnectFuture {
+ public DefaultIoConnectFuture(Object lock) {
super(lock);
}
@@ -113,5 +156,4 @@ public class Nio2Connector extends Nio2Service implements IoConnector {
setValue(exception);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa0d8d37/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
index 1df3731..080369a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
@@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.FactoryManagerHolder;
+import org.apache.sshd.common.PropertyResolver;
import org.apache.sshd.common.PropertyResolverUtils;
import org.apache.sshd.common.io.IoHandler;
import org.apache.sshd.common.io.IoService;
@@ -42,12 +44,12 @@ import org.apache.sshd.common.util.closeable.CloseableUtils;
/**
*/
-public abstract class Nio2Service extends AbstractInnerCloseable implements IoService {
- protected final FactoryManager manager;
- protected final IoHandler handler;
+public abstract class Nio2Service extends AbstractInnerCloseable implements IoService, FactoryManagerHolder {
protected final Map<Long, IoSession> sessions;
protected final AtomicBoolean disposing = new AtomicBoolean();
- protected final AsynchronousChannelGroup group;
+ private final FactoryManager manager;
+ private final IoHandler handler;
+ private final AsynchronousChannelGroup group;
protected Nio2Service(FactoryManager manager, IoHandler handler, AsynchronousChannelGroup group) {
if (log.isTraceEnabled()) {
@@ -59,9 +61,22 @@ public abstract class Nio2Service extends AbstractInnerCloseable implements IoSe
this.sessions = new ConcurrentHashMap<>();
}
+ protected AsynchronousChannelGroup getChannelGroup() {
+ return group;
+ }
+
+ @Override
+ public FactoryManager getFactoryManager() {
+ return manager;
+ }
+
+ public IoHandler getIoHandler() {
+ return handler;
+ }
+
public void dispose() {
try {
- long maxWait = CloseableUtils.getMaxCloseWaitTime(manager);
+ long maxWait = CloseableUtils.getMaxCloseWaitTime(getFactoryManager());
boolean successful = close(true).await(maxWait);
if (!successful) {
throw new SocketTimeoutException("Failed to receive closure confirmation within " + maxWait + " millis");
@@ -92,6 +107,7 @@ public abstract class Nio2Service extends AbstractInnerCloseable implements IoSe
}
protected <T> void setOption(NetworkChannel socket, String property, SocketOption<T> option, T defaultValue) throws IOException {
+ PropertyResolver manager = getFactoryManager();
String valStr = PropertyResolverUtils.getString(manager, property);
T val = defaultValue;
if (!GenericUtils.isEmpty(valStr)) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa0d8d37/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index 5629d63..474251f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -32,12 +32,14 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.PropertyResolverUtils;
+import org.apache.sshd.common.RuntimeSshException;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.io.IoHandler;
-import org.apache.sshd.common.io.IoService;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.Readable;
+import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.closeable.AbstractCloseable;
@@ -51,8 +53,8 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
private final long id = SESSION_ID_GENERATOR.incrementAndGet();
private final Nio2Service service;
- private final IoHandler handler;
- private final AsynchronousSocketChannel socket;
+ private final IoHandler ioHandler;
+ private final AsynchronousSocketChannel socketChannel;
private final Map<Object, Object> attributes = new HashMap<Object, Object>();
private final SocketAddress localAddress;
private final SocketAddress remoteAddress;
@@ -61,13 +63,15 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
private final AtomicReference<Nio2DefaultIoWriteFuture> currentWrite = new AtomicReference<>();
public Nio2Session(Nio2Service service, FactoryManager manager, IoHandler handler, AsynchronousSocketChannel socket) throws IOException {
- this.service = service;
- this.manager = manager;
- this.handler = handler;
- this.socket = socket;
+ this.service = ValidateUtils.checkNotNull(service, "No service instance");
+ this.manager = ValidateUtils.checkNotNull(manager, "No factory manager");
+ this.ioHandler = ValidateUtils.checkNotNull(handler, "No IoHandler");
+ this.socketChannel = ValidateUtils.checkNotNull(socket, "No socket channel");
this.localAddress = socket.getLocalAddress();
this.remoteAddress = socket.getRemoteAddress();
- log.debug("Creating IoSession on {} from {}", localAddress, remoteAddress);
+ if (log.isDebugEnabled()) {
+ log.debug("Creating IoSession on {} from {}", localAddress, remoteAddress);
+ }
}
@Override
@@ -95,23 +99,39 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
return localAddress;
}
+ public AsynchronousSocketChannel getSocket() {
+ return socketChannel;
+ }
+
+ public IoHandler getIoHandler() {
+ return ioHandler;
+ }
+
public void suspend() {
+ AsynchronousSocketChannel socket = getSocket();
try {
- this.socket.shutdownInput();
+ socket.shutdownInput();
} catch (IOException e) {
- // Ignore
+ if (log.isDebugEnabled()) {
+ log.debug("suspend({}) failed {{}) to shutdown input: {}",
+ this, e.getClass().getSimpleName(), e.getMessage());
+ }
}
+
try {
- this.socket.shutdownOutput();
+ socket.shutdownOutput();
} catch (IOException e) {
- // Ignore
+ if (log.isDebugEnabled()) {
+ log.debug("suspend({}) failed {{}) to shutdown output: {}",
+ this, e.getClass().getSimpleName(), e.getMessage());
+ }
}
}
@Override
public IoWriteFuture write(Buffer buffer) {
if (log.isDebugEnabled()) {
- log.debug("Writing {} bytes", Integer.valueOf(buffer.available()));
+ log.debug("Writing {} bytes", buffer.available());
}
ByteBuffer buf = ByteBuffer.wrap(buffer.array(), buffer.rpos(), buffer.available());
@@ -127,20 +147,28 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
return future;
}
- private void exceptionCaught(Throwable exc) {
+ protected void exceptionCaught(Throwable exc) {
if (!closeFuture.isClosed()) {
+ AsynchronousSocketChannel socket = getSocket();
if (isClosing() || !socket.isOpen()) {
close(true);
} else {
+ IoHandler handler = getIoHandler();
try {
if (log.isDebugEnabled()) {
- log.debug("Caught {}[{}] - calling handler", exc.getClass().getSimpleName(), exc.getMessage());
+ log.debug("exceptionCaught({}) caught {}[{}] - calling handler",
+ this, exc.getClass().getSimpleName(), exc.getMessage());
}
handler.exceptionCaught(this, exc);
- } catch (Throwable t) {
+ } catch (Throwable e) {
+ Throwable t = GenericUtils.peelException(e);
if (log.isDebugEnabled()) {
- log.debug("Exception handler threw {}, closing the session: {}",
- t.getClass().getSimpleName(), t.getMessage());
+ log.debug("exceptionCaught({}) Exception handler threw {}, closing the session: {}",
+ this, t.getClass().getSimpleName(), t.getMessage());
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("exceptionCaught(" + this + ") exception handler failure details", t);
}
close(true);
}
@@ -163,23 +191,34 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
break;
}
}
+
+ AsynchronousSocketChannel socket = getSocket();
try {
socket.close();
} catch (IOException e) {
- log.info("Exception caught while closing socket", e);
+ log.info("doCloseImmediately(" + this + ") exception caught while closing socket", e);
}
+
service.sessionClosed(this);
super.doCloseImmediately();
+
+ IoHandler handler = getIoHandler();
try {
handler.sessionClosed(this);
- } catch (Exception e) {
- // Ignore
- log.debug("Exception caught while calling IoHandler#sessionClosed", e);
+ } catch (Throwable e) {
+ if (log.isDebugEnabled()) {
+ log.debug("doCloseImmediately({}) {} while calling IoHandler#sessionClosed: {}",
+ this, e.getClass().getSimpleName(), e.getMessage());
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("doCloseImmediately(" + this + ") IoHandler#sessionClosed failure details", e);
+ }
}
}
- @Override
- public IoService getService() {
+ @Override // co-variant return
+ public Nio2Service getService() {
return service;
}
@@ -213,93 +252,141 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
});
}
- protected void doReadCycle(final ByteBuffer buffer, final Readable bufReader) {
- final Nio2CompletionHandler<Integer, Object> completion = new Nio2CompletionHandler<Integer, Object>() {
+ protected void doReadCycle(ByteBuffer buffer, Readable bufReader) {
+ Nio2CompletionHandler<Integer, Object> completion =
+ ValidateUtils.checkNotNull(createReadCycleCompletionHandler(buffer, bufReader), "No completion handler created");
+ doReadCycle(buffer, completion);
+ }
+
+ protected Nio2CompletionHandler<Integer, Object> createReadCycleCompletionHandler(final ByteBuffer buffer, final Readable bufReader) {
+ return new Nio2CompletionHandler<Integer, Object>() {
@Override
- @SuppressWarnings("synthetic-access")
protected void onCompleted(Integer result, Object attachment) {
- try {
- if (result >= 0) {
- log.debug("Read {} bytes", result);
- buffer.flip();
- handler.messageReceived(Nio2Session.this, bufReader);
- if (!closeFuture.isClosed()) {
- // re-use reference for next iteration since we finished processing it
- buffer.clear();
- doReadCycle(buffer, this);
- } else {
- log.debug("IoSession has been closed, stop reading");
- }
- } else {
- log.debug("Socket has been disconnected, closing IoSession now");
- Nio2Session.this.close(true);
- }
- } catch (Throwable exc) {
- failed(exc, attachment);
- }
+ handleReadCycleCompletion(buffer, bufReader, this, result, attachment);
}
@Override
- @SuppressWarnings("synthetic-access")
protected void onFailed(Throwable exc, Object attachment) {
- exceptionCaught(exc);
+ handleReadCycleFailure(buffer, bufReader, exc, attachment);
}
};
- doReadCycle(buffer, completion);
+ }
+
+ protected void handleReadCycleCompletion(
+ ByteBuffer buffer, Readable bufReader, Nio2CompletionHandler<Integer, Object> completionHandler, Integer result, Object attachment) {
+ try {
+ if (result >= 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("handleReadCycleCompletion({}) read {} bytes", this, result);
+ }
+ buffer.flip();
+
+ IoHandler handler = getIoHandler();
+ handler.messageReceived(this, bufReader);
+ if (!closeFuture.isClosed()) {
+ // re-use reference for next iteration since we finished processing it
+ buffer.clear();
+ doReadCycle(buffer, completionHandler);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("handleReadCycleCompletion({}) IoSession has been closed, stop reading", this);
+ }
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("handleReadCycleCompletion({}) Socket has been disconnected (result={}), closing IoSession now", this, result);
+ }
+ close(true);
+ }
+ } catch (Throwable exc) {
+ completionHandler.failed(exc, attachment);
+ }
+ }
+
+ protected void handleReadCycleFailure(ByteBuffer buffer, Readable bufReader, Throwable exc, Object attachment) {
+ exceptionCaught(exc);
}
protected void doReadCycle(ByteBuffer buffer, Nio2CompletionHandler<Integer, Object> completion) {
+ AsynchronousSocketChannel socket = getSocket();
socket.read(buffer, null, completion);
}
- @SuppressWarnings("synthetic-access")
- private void startWriting() {
- final Nio2DefaultIoWriteFuture future = writes.peek();
+ protected void startWriting() {
+ Nio2DefaultIoWriteFuture future = writes.peek();
if (future != null) {
if (currentWrite.compareAndSet(null, future)) {
try {
- final ByteBuffer buffer = future.getBuffer();
- socket.write(buffer, null, new Nio2CompletionHandler<Integer, Object>() {
- @Override
- protected void onCompleted(Integer result, Object attachment) {
- if (buffer.hasRemaining()) {
- try {
- socket.write(buffer, null, this);
- } catch (Throwable t) {
- log.debug("Exception caught while writing", t);
- future.setWritten();
- finishWrite();
- }
- } else {
- log.debug("Finished writing");
- future.setWritten();
- finishWrite();
- }
- }
-
- @Override
- protected void onFailed(Throwable exc, Object attachment) {
- future.setException(exc);
- exceptionCaught(exc);
- finishWrite();
- }
-
- private void finishWrite() {
- writes.remove(future);
- currentWrite.compareAndSet(future, null);
- startWriting();
- }
- });
- } catch (RuntimeException e) {
+ ByteBuffer buffer = future.getBuffer();
+ AsynchronousSocketChannel socket = getSocket();
+ Nio2CompletionHandler<Integer, Object> handler =
+ ValidateUtils.checkNotNull(createWriteCycleCompletionHandler(future, socket, buffer),
+ "No write cycle completion handler created");
+ socket.write(buffer, null, handler);
+ } catch (Throwable e) {
future.setWritten();
- throw e;
+ if (e instanceof RuntimeException) {
+ throw e;
+ } else {
+ throw new RuntimeSshException(e);
+ }
}
}
}
}
+ protected Nio2CompletionHandler<Integer, Object> createWriteCycleCompletionHandler(
+ final Nio2DefaultIoWriteFuture future, final AsynchronousSocketChannel socket, final ByteBuffer buffer) {
+ return new Nio2CompletionHandler<Integer, Object>() {
+ @Override
+ protected void onCompleted(Integer result, Object attachment) {
+ handleCompletedWriteCycle(future, socket, buffer, this, result, attachment);
+ }
+
+ @Override
+ protected void onFailed(Throwable exc, Object attachment) {
+ handleWriteCycleFailure(future, socket, buffer, exc, attachment);
+ }
+ };
+ }
+
+ protected void handleCompletedWriteCycle(
+ Nio2DefaultIoWriteFuture future, AsynchronousSocketChannel socket, ByteBuffer buffer,
+ Nio2CompletionHandler<Integer, Object> completionHandler, Integer result, Object attachment) {
+ if (buffer.hasRemaining()) {
+ try {
+ socket.write(buffer, null, completionHandler);
+ } catch (Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("handleCompletedWriteCycle(" + this + ") Exception caught while writing", t);
+ }
+ future.setWritten();
+ finishWrite(future);
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("handleCompletedWriteCycle({}) finished writing", this);
+ }
+ future.setWritten();
+ finishWrite(future);
+ }
+ }
+
+ protected void handleWriteCycleFailure(
+ Nio2DefaultIoWriteFuture future, AsynchronousSocketChannel socket, ByteBuffer buffer, Throwable exc, Object attachment) {
+ future.setException(exc);
+ exceptionCaught(exc);
+ finishWrite(future);
+ }
+
+ protected void finishWrite(Nio2DefaultIoWriteFuture future) {
+ writes.remove(future);
+ currentWrite.compareAndSet(future, null);
+ startWriting();
+ }
+
@Override
public String toString() {
- return getClass().getSimpleName() + "[local=" + localAddress + ", remote=" + remoteAddress + "]";
+ return getClass().getSimpleName() + "[local=" + getLocalAddress() + ", remote=" + getRemoteAddress() + "]";
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa0d8d37/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
index d47553b..2729cf6 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.sshd.common.Closeable;
-import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.FactoryManagerHolder;
import org.apache.sshd.common.PropertyResolver;
import org.apache.sshd.common.Service;
import org.apache.sshd.common.auth.MutableUserHolder;
@@ -48,6 +48,7 @@ public interface Session
extends KexFactoryManager,
SessionListenerManager,
ChannelListenerManager,
+ FactoryManagerHolder,
PropertyResolver,
Closeable,
MutableUserHolder {
@@ -102,11 +103,6 @@ public interface Session
String getServerVersion();
/**
- * @return the {@link FactoryManager} that has created this session, can not be {@code null}
- */
- FactoryManager getFactoryManager();
-
- /**
* Retrieve one of the negotiated values during the KEX stage
*
* @param paramType The request {@link KexProposalOption} value - ignored