You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/06/14 16:12:01 UTC

mina-sshd git commit: [SSHD-433] Cannot close and then reopen a remote forward on the same port

Repository: mina-sshd
Updated Branches:
  refs/heads/master 9fc3a98ab -> f71181ea6


[SSHD-433] Cannot close and then reopen a remote forward on the same port


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/f71181ea
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/f71181ea
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/f71181ea

Branch: refs/heads/master
Commit: f71181ea6c513318fc597231bd37e738a661e951
Parents: 9fc3a98
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Sun Jun 14 17:11:51 2015 +0300
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Sun Jun 14 17:11:51 2015 +0300

----------------------------------------------------------------------
 .../apache/sshd/common/SshdSocketAddress.java   |  23 +-
 .../common/forward/DefaultTcpipForwarder.java   | 261 ++++++++++++++-----
 .../common/forward/LocalForwardingEntry.java    |  88 +++++++
 .../apache/sshd/common/forward/SocksProxy.java  |  19 +-
 .../apache/sshd/common/io/nio2/Nio2Service.java |   9 +-
 .../apache/sshd/common/util/ValidateUtils.java  |   8 +-
 .../global/CancelTcpipForwardHandler.java       |   5 +-
 .../sshd/server/global/TcpipForwardHandler.java |   5 +-
 .../org/apache/sshd/PortForwardingTest.java     | 126 ++++++---
 9 files changed, 426 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f71181ea/sshd-core/src/main/java/org/apache/sshd/common/SshdSocketAddress.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/SshdSocketAddress.java b/sshd-core/src/main/java/org/apache/sshd/common/SshdSocketAddress.java
index 578572b..d8a351d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/SshdSocketAddress.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/SshdSocketAddress.java
@@ -77,22 +77,27 @@ public class SshdSocketAddress extends SocketAddress {
         return getHostName() + ":" + getPort();
     }
 
+    protected boolean isEquivalent(SshdSocketAddress that) {
+        if (that == null) {
+            return false;
+        } else if (that == this) {
+            return true;
+        } else if ((this.getPort() == that.getPort())
+                && Objects.equals(this.getHostName(), that.getHostName())) {
+           return true;
+       } else {
+           return false;   // debug breakpoint
+       }
+    }
+
     @Override
     public boolean equals(Object o) {
-        if (this == o)
-            return true;
         if (o == null)
             return false;
         if (getClass() != o.getClass())
             return false;
 
-        SshdSocketAddress that = (SshdSocketAddress) o;
-        if ((this.getPort() == that.getPort())
-         && Objects.equals(this.getHostName(), that.getHostName())) {
-            return true;
-        } else {
-            return false;   // debug breakpoint
-        }
+        return isEquivalent((SshdSocketAddress) o);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f71181ea/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
index 7e9d149..184c029 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
@@ -29,17 +29,22 @@ import java.util.Set;
 import org.apache.sshd.client.channel.ClientChannel;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.Factory;
+import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.SshdSocketAddress;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoAcceptor;
 import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoServiceFactory;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.CloseableUtils;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.Readable;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.server.forward.ForwardingFilter;
@@ -52,16 +57,32 @@ import org.apache.sshd.server.forward.ForwardingFilter;
 public class DefaultTcpipForwarder extends CloseableUtils.AbstractInnerCloseable implements TcpipForwarder {
 
     private final ConnectionService service;
+    private final Factory<IoHandler> socksProxyIoHandlerFactory = new Factory<IoHandler>() {
+            @Override
+            public IoHandler create() {
+                return new SocksProxy(getConnectionService());
+            }
+        };
     private final Session session;
     private final Map<Integer, SshdSocketAddress> localToRemote = new HashMap<Integer, SshdSocketAddress>();
     private final Map<Integer, SshdSocketAddress> remoteToLocal = new HashMap<Integer, SshdSocketAddress>();
     private final Map<Integer, SocksProxy> dynamicLocal = new HashMap<Integer, SocksProxy>();
-    private final Set<SshdSocketAddress> localForwards = new HashSet<SshdSocketAddress>();
+    private final Set<LocalForwardingEntry> localForwards = new HashSet<LocalForwardingEntry>();
+    private final Factory<IoHandler> staticIoHandlerFactory = new Factory<IoHandler>() {
+            @Override
+            public IoHandler create() {
+                return new StaticIoHandler();
+            }
+        };
     protected IoAcceptor acceptor;
 
     public DefaultTcpipForwarder(ConnectionService service) {
-        this.service = service;
-        this.session = service.getSession();
+        this.service = ValidateUtils.checkNotNull(service, "No connection service", GenericUtils.EMPTY_OBJECT_ARRAY);
+        this.session = ValidateUtils.checkNotNull(service.getSession(), "No session", GenericUtils.EMPTY_OBJECT_ARRAY);
+    }
+
+    public final ConnectionService getConnectionService() {
+        return service;
     }
 
     //
@@ -70,35 +91,61 @@ public class DefaultTcpipForwarder extends CloseableUtils.AbstractInnerCloseable
 
     @Override
     public synchronized SshdSocketAddress startLocalPortForwarding(SshdSocketAddress local, SshdSocketAddress remote) throws IOException {
-        if (local == null) {
-            throw new IllegalArgumentException("Local address is null");
-        }
-        if (remote == null) {
-            throw new IllegalArgumentException("Remote address is null");
-        }
-        if (local.getPort() < 0) {
-            throw new IllegalArgumentException("Invalid local port: " + local.getPort());
-        }
+        ValidateUtils.checkNotNull(local, "Local address is null", GenericUtils.EMPTY_OBJECT_ARRAY);
+        ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: %s", local);
+        ValidateUtils.checkNotNull(remote, "Remote address is null", GenericUtils.EMPTY_OBJECT_ARRAY);
+
         if (isClosed()) {
             throw new IllegalStateException("TcpipForwarder is closed");
         }
         if (isClosing()) {
             throw new IllegalStateException("TcpipForwarder is closing");
         }
-        SshdSocketAddress bound = doBind(local, new StaticIoHandler());
-        localToRemote.put(Integer.valueOf(bound.getPort()), remote);
-        return bound;
+
+        InetSocketAddress bound = doBind(local, staticIoHandlerFactory);
+        int port = bound.getPort();
+        SshdSocketAddress prev;
+        synchronized(localToRemote) {
+            prev = localToRemote.put(Integer.valueOf(port), remote);
+        }
+        
+        if (prev != null) {
+            throw new IOException("Multiple local port forwarding bindings on port=" + port + ": current=" + remote + ", previous=" + prev);
+        }
+
+        SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), port);
+        if (log.isDebugEnabled()) {
+            log.debug("startLocalPortForwarding(" + local + " -> " + remote + "): " + result);
+        }
+        return result;
     }
 
     @Override
     public synchronized void stopLocalPortForwarding(SshdSocketAddress local) throws IOException {
-        if (localToRemote.remove(Integer.valueOf(local.getPort())) != null && acceptor != null) {
-            acceptor.unbind(local.toInetSocketAddress());
+        ValidateUtils.checkNotNull(local, "Local address is null", GenericUtils.EMPTY_OBJECT_ARRAY);
+
+        SshdSocketAddress bound;
+        synchronized(localToRemote) {
+            bound = localToRemote.remove(Integer.valueOf(local.getPort()));
+        }
+
+        if ((bound != null) && (acceptor != null)) {
+            if (log.isDebugEnabled()) {
+                log.debug("stopLocalPortForwarding(" + local + ") unbind " + bound);
+            }
+            acceptor.unbind(bound.toInetSocketAddress());
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("stopLocalPortForwarding(" + local + ") no mapping/acceptor for " + bound);
+            }
         }
     }
 
     @Override
     public synchronized SshdSocketAddress startRemotePortForwarding(SshdSocketAddress remote, SshdSocketAddress local) throws IOException {
+        ValidateUtils.checkNotNull(local, "Local address is null", GenericUtils.EMPTY_OBJECT_ARRAY);
+        ValidateUtils.checkNotNull(remote, "Remote address is null", GenericUtils.EMPTY_OBJECT_ARRAY);
+
         Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_GLOBAL_REQUEST);
         buffer.putString("tcpip-forward");
         buffer.putBoolean(true);
@@ -108,83 +155,157 @@ public class DefaultTcpipForwarder extends CloseableUtils.AbstractInnerCloseable
         if (result == null) {
             throw new SshException("Tcpip forwarding request denied by server");
         }
-        int port = remote.getPort() == 0 ? result.getInt() : remote.getPort();
+        int port = (remote.getPort() == 0) ? result.getInt() : remote.getPort();
         // TODO: Is it really safe to only store the local address after the request ?
-        remoteToLocal.put(Integer.valueOf(port), local);
-        return new SshdSocketAddress(remote.getHostName(), port);
+        SshdSocketAddress prev;
+        synchronized(remoteToLocal) {
+            prev = remoteToLocal.put(Integer.valueOf(port), local);
+        }
+        
+        if (prev != null) {
+            throw new IOException("Multiple remote port forwarding bindings on port=" + port + ": current=" + remote + ", previous=" + prev);
+        }
+
+        SshdSocketAddress bound = new SshdSocketAddress(remote.getHostName(), port);
+        if (log.isDebugEnabled()) {
+            log.debug("startRemotePortForwarding(" + remote + " -> " + local + "): " + bound);
+        }
+
+        return bound;
     }
 
     @Override
     public synchronized void stopRemotePortForwarding(SshdSocketAddress remote) throws IOException {
-        if (remoteToLocal.remove(Integer.valueOf(remote.getPort())) != null) {
+        SshdSocketAddress bound;
+        synchronized(remoteToLocal) {
+            bound = remoteToLocal.remove(Integer.valueOf(remote.getPort()));
+        }
+
+        if (bound != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("stopRemotePortForwarding(" + remote + ") cancel forwarding to " + bound);
+            }
+
             Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_GLOBAL_REQUEST);
             buffer.putString("cancel-tcpip-forward");
             buffer.putBoolean(false);
             buffer.putString(remote.getHostName());
             buffer.putInt(remote.getPort());
             session.writePacket(buffer);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("stopRemotePortForwarding(" + remote + ") no binding found");
+            }
         }
     }
 
     @Override
     public synchronized SshdSocketAddress startDynamicPortForwarding(SshdSocketAddress local) throws IOException {
-        if (local == null) {
-            throw new IllegalArgumentException("Local address is null");
-        }
-        if (local.getPort() < 0) {
-            throw new IllegalArgumentException("Invalid local port: " + local.getPort());
-        }
+        ValidateUtils.checkNotNull(local, "Local address is null", GenericUtils.EMPTY_OBJECT_ARRAY);
+        ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: %s", local);
+
         if (isClosed()) {
             throw new IllegalStateException("TcpipForwarder is closed");
         }
         if (isClosing()) {
             throw new IllegalStateException("TcpipForwarder is closing");
         }
-        SocksProxy socksProxy = new SocksProxy(service);
-        SshdSocketAddress bound = doBind(local, new SocksProxy(service));
-        dynamicLocal.put(Integer.valueOf(bound.getPort()), socksProxy);
-        return bound;
+
+        SocksProxy socksProxy = new SocksProxy(service), prev;
+        InetSocketAddress bound = doBind(local, socksProxyIoHandlerFactory);
+        int port = bound.getPort();
+        synchronized(dynamicLocal) {
+            prev = dynamicLocal.put(Integer.valueOf(port), socksProxy);
+        }
+        
+        if (prev != null) {
+            throw new IOException("Multiple dynamic port mappings found for port=" + port + ": current=" + socksProxy + ", previous=" + prev);
+        }
+
+        SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), port);
+        if (log.isDebugEnabled()) {
+            log.debug("startDynamicPortForwarding(" + local + "): " + result);
+        }
+        
+        return result;
     }
 
     @Override
     public synchronized void stopDynamicPortForwarding(SshdSocketAddress local) throws IOException {
-        Closeable obj = dynamicLocal.remove(Integer.valueOf(local.getPort()));
+        Closeable obj;
+        synchronized(dynamicLocal) {
+            obj = dynamicLocal.remove(Integer.valueOf(local.getPort()));
+        }
+
         if (obj != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("stopDynamicPortForwarding(" + local + ") unbinding");
+            }
             obj.close(true);
             acceptor.unbind(local.toInetSocketAddress());
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("stopDynamicPortForwarding(" + local + ") no binding found");
+            }
         }
     }
 
     @Override
     public synchronized SshdSocketAddress getForwardedPort(int remotePort) {
-        return remoteToLocal.get(Integer.valueOf(remotePort));
+        synchronized(remoteToLocal) {
+            return remoteToLocal.get(Integer.valueOf(remotePort));
+        }
     }
 
     @Override
     public synchronized SshdSocketAddress localPortForwardingRequested(SshdSocketAddress local) throws IOException {
-        if (local == null) {
-            throw new IllegalArgumentException("Local address is null");
-        }
-        if (local.getPort() < 0) {
-            throw new IllegalArgumentException("Invalid local port: " + local.getPort());
-        }
+        ValidateUtils.checkNotNull(local, "Local address is null", GenericUtils.EMPTY_OBJECT_ARRAY);
+        ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: %s", local);
         
-        ForwardingFilter filter = session.getFactoryManager().getTcpipForwardingFilter();
+        FactoryManager manager = session.getFactoryManager();
+        ForwardingFilter filter = manager.getTcpipForwardingFilter();
         if ((filter == null) || (!filter.canListen(local, session))) {
             if (log.isDebugEnabled()) {
                 log.debug("localPortForwardingRequested(" + session + ")[" + local + "][haveFilter=" + (filter != null) + "] rejected");
             }
             throw new IOException("Rejected address: " + local);
         }
-        SshdSocketAddress bound = doBind(local, new StaticIoHandler());
-        localForwards.add(bound);
-        return bound;
+        InetSocketAddress bound = doBind(local, staticIoHandlerFactory);
+        SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), bound.getPort());
+        if (log.isDebugEnabled()) {
+            log.debug("localPortForwardingRequested(" + local + "): " + result);
+        }
+
+        boolean added;
+        synchronized(localForwards) {
+            // NOTE !!! it is crucial to use the bound address host name first
+            added = localForwards.add(new LocalForwardingEntry(result.getHostName(), local.getHostName(), result.getPort()));
+        }
+        
+        if (!added) {
+            throw new IOException("Failed to add local port forwarding entry for " + local + " -> " + result);
+        }
+        return result;
     }
 
     @Override
     public synchronized void localPortForwardingCancelled(SshdSocketAddress local) throws IOException {
-        if (localForwards.remove(local) && acceptor != null) {
-            acceptor.unbind(local.toInetSocketAddress());
+        LocalForwardingEntry entry;
+        synchronized(localForwards) {
+            if ((entry=LocalForwardingEntry.findMatchingEntry(local.getHostName(), local.getPort(), localForwards)) != null) {
+                localForwards.remove(entry);
+            }
+        }
+
+        if ((entry != null) && (acceptor != null)) {
+            if (log.isDebugEnabled()) {
+                log.debug("localPortForwardingCancelled(" + local + ") unbind " + entry);
+            }
+            acceptor.unbind(entry.toInetSocketAddress());
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("localPortForwardingCancelled(" + local + ") no match/acceptor: " + entry);
+            }
         }
     }
 
@@ -193,29 +314,41 @@ public class DefaultTcpipForwarder extends CloseableUtils.AbstractInnerCloseable
         return builder().parallel(dynamicLocal.values()).close(acceptor).build();
     }
 
-    //
-    // Private methods
-    //
-
-    private SshdSocketAddress doBind(SshdSocketAddress address, IoHandler handler) throws IOException {
+    /**
+     * @param address The request bind address
+     * @param handlerFactory A {@link Factory} to create an {@link IoHandler} if necessary
+     * @return The {@link InetSocketAddress} to which the binding occurred
+     * @throws IOException If failed to bind
+     */
+    private InetSocketAddress doBind(SshdSocketAddress address, Factory<? extends IoHandler> handlerFactory) throws IOException {
         if (acceptor == null) {
-            acceptor = session.getFactoryManager().getIoServiceFactory().createAcceptor(handler);
+            FactoryManager manager = session.getFactoryManager();
+            IoServiceFactory factory = manager.getIoServiceFactory();
+            IoHandler handler = handlerFactory.create();
+            acceptor = factory.createAcceptor(handler);
         }
+
+        // TODO find a better way to determine the resulting bind address - what if multi-threaded calls...
         Set<SocketAddress> before = acceptor.getBoundAddresses();
         try {
-            acceptor.bind(address.toInetSocketAddress());
+            InetSocketAddress bindAddress = address.toInetSocketAddress(); 
+            acceptor.bind(bindAddress);
+
             Set<SocketAddress> after = acceptor.getBoundAddresses();
-            after.removeAll(before);
-            if (after.isEmpty()) {
-                throw new IOException("Error binding to " + address + ": no local addresses bound");
+            if (GenericUtils.size(after) > 0) {
+                after.removeAll(before);
+            }
+            if (GenericUtils.isEmpty(after)) {
+                throw new IOException("Error binding to " + address + "[" + bindAddress + "]: no local addresses bound");
             }
+
             if (after.size() > 1) {
-                throw new IOException("Multiple local addresses have been bound for " + address);
+                throw new IOException("Multiple local addresses have been bound for " + address + "[" + bindAddress + "]");
             }
-            InetSocketAddress result = (InetSocketAddress) after.iterator().next();
-            return new SshdSocketAddress(result.getHostName(), result.getPort());
+            return (InetSocketAddress) after.iterator().next();
         } catch (IOException bindErr) {
-            if (acceptor.getBoundAddresses().isEmpty()) {
+            Set<SocketAddress> after = acceptor.getBoundAddresses();
+            if (GenericUtils.isEmpty(after)) {
                 close();
             }
             throw bindErr;
@@ -232,13 +365,17 @@ public class DefaultTcpipForwarder extends CloseableUtils.AbstractInnerCloseable
     //
 
     class StaticIoHandler implements IoHandler {
+        StaticIoHandler() {
+            super();
+        }
 
         @SuppressWarnings("synthetic-access")
         @Override
         public void sessionCreated(final IoSession session) throws Exception {
-            final TcpipClientChannel channel;
-            int localPort = ((InetSocketAddress) session.getLocalAddress()).getPort();
+            InetSocketAddress local = (InetSocketAddress) session.getLocalAddress(); 
+            int localPort = local.getPort();
             SshdSocketAddress remote = localToRemote.get(Integer.valueOf(localPort));
+            final TcpipClientChannel channel;
             if (remote != null) {
                 channel = new TcpipClientChannel(TcpipClientChannel.Type.Direct, session, remote);
             } else {
@@ -283,7 +420,5 @@ public class DefaultTcpipForwarder extends CloseableUtils.AbstractInnerCloseable
             cause.printStackTrace();
             session.close(false);
         }
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f71181ea/sshd-core/src/main/java/org/apache/sshd/common/forward/LocalForwardingEntry.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/LocalForwardingEntry.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/LocalForwardingEntry.java
new file mode 100644
index 0000000..5b3ca97
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/LocalForwardingEntry.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.forward;
+
+import java.util.Collection;
+import java.util.Objects;
+
+import org.apache.sshd.common.SshdSocketAddress;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class LocalForwardingEntry extends SshdSocketAddress {
+    private static final long serialVersionUID = 423661570180889621L;
+    private final String alias;
+
+    public LocalForwardingEntry(String hostName, String alias, int port) {
+        super(hostName, port);
+        this.alias = ValidateUtils.checkNotNullAndNotEmpty(alias, "No host alias", GenericUtils.EMPTY_OBJECT_ARRAY);
+    }
+
+    public String getAlias() {
+        return alias;
+    }
+
+    @Override
+    protected boolean isEquivalent(SshdSocketAddress that) {
+        if (super.isEquivalent(that) && (that instanceof LocalForwardingEntry)) {
+            LocalForwardingEntry entry = (LocalForwardingEntry) that;
+            if (Objects.equals(this.getAlias(), entry.getAlias())) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode() + Objects.hashCode(getAlias());
+    }
+
+    @Override
+    public String toString() {
+        return super.toString() + " - " + getAlias();
+    }
+    
+    /**
+     * @param host The host - ignored if {@code null}/empty - i.e., no match reported
+     * @param port The port - ignored if non-positive - i.e., no match reported
+     * @param entries The {@link Collection} of {@link LocalForwardingEntry} to check
+     * - ignored if {@code null}/empty - i.e., no match reported
+     * @return The <U>first</U> entry whose host or alias matches the host name - case
+     * <U>sensitive</U> <B>and</B> has a matching port - {@code null} if no match found
+     */
+    public static final LocalForwardingEntry findMatchingEntry(String host, int port, Collection<? extends LocalForwardingEntry> entries) {
+        if (GenericUtils.isEmpty(host) || (port <= 0) || (GenericUtils.isEmpty(entries))) {
+            return null;
+        }
+        
+        for (LocalForwardingEntry e : entries) {
+            if ((port == e.getPort()) && (host.equals(e.getHostName()) || host.equals(e.getAlias()))) {
+                return e;
+            }
+        }
+        
+        return null;    // no match found
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f71181ea/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
index 21df217..84b96a3 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
@@ -36,8 +36,8 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 
 /**
  * SOCKS proxy server, supporting simple socks4/5 protocols.
- *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ * @see <A HREF="https://en.wikipedia.org/wiki/SOCKS">SOCKS Wikipedia</A>
  */
 public class SocksProxy extends CloseableUtils.AbstractCloseable implements IoHandler {
 
@@ -120,6 +120,9 @@ public class SocksProxy extends CloseableUtils.AbstractCloseable implements IoHa
         }
     }
 
+    /**
+     * @see <A HREF="https://en.wikipedia.org/wiki/SOCKS#SOCKS4">SOCKS4</A>
+     */
     public class Socks4 extends Proxy {
         public Socks4(IoSession session) {
             super(session);
@@ -193,10 +196,12 @@ public class SocksProxy extends CloseableUtils.AbstractCloseable implements IoHa
 
     }
 
+    /**
+     * @see <A HREF="https://en.wikipedia.org/wiki/SOCKS#SOCKS5">SOCKS5</A>
+     */
     public class Socks5 extends Proxy {
-
-        byte[] authMethods;
-        Buffer response;
+        private byte[] authMethods;
+        private Buffer response;
 
         public Socks5(IoSession session) {
             super(session);
@@ -229,10 +234,14 @@ public class SocksProxy extends CloseableUtils.AbstractCloseable implements IoHa
                     throw new IllegalStateException("Unexpected version: " + version);
                 }
                 int cmd = buffer.getByte();
-                if (cmd != 1) {
+                if (cmd != 1) { // establish a TCP/IP stream connection
                     throw new IllegalStateException("Unsupported socks command: " + cmd);
                 }
                 final int res = buffer.getByte();
+                if (res != 0) {
+                    log.debug("No zero reserved value: " + (res & 0x00FF));
+                }
+
                 int type = buffer.getByte();
                 String host;
                 if (type == 0x01) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f71181ea/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
index a9b23a5..e93bff0 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
@@ -35,6 +35,7 @@ import org.apache.sshd.common.io.IoService;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.util.CloseableUtils;
 import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
 
 /**
  */
@@ -46,11 +47,11 @@ public abstract class Nio2Service extends CloseableUtils.AbstractInnerCloseable
     protected final AsynchronousChannelGroup group;
 
     protected Nio2Service(FactoryManager manager, IoHandler handler, AsynchronousChannelGroup group) {
-        log.debug("Creating {}", getClass().getSimpleName());
-        this.manager = manager;
-        this.handler = handler;
+        log.trace("Creating {}", getClass().getSimpleName());
+        this.manager = ValidateUtils.checkNotNull(manager, "No factory manager provided", GenericUtils.EMPTY_OBJECT_ARRAY);
+        this.handler = ValidateUtils.checkNotNull(handler, "No I/O handler provided", GenericUtils.EMPTY_OBJECT_ARRAY);
+        this.group = ValidateUtils.checkNotNull(group, "No async. channel group provided", GenericUtils.EMPTY_OBJECT_ARRAY);
         this.sessions = new ConcurrentHashMap<Long, IoSession>();
-        this.group = group;
     }
 
     public void dispose() {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f71181ea/sshd-core/src/main/java/org/apache/sshd/common/util/ValidateUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/ValidateUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/ValidateUtils.java
index 735ba34..64c3c7e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/ValidateUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/ValidateUtils.java
@@ -66,7 +66,13 @@ public final class ValidateUtils {
         checkTrue(GenericUtils.length(t) > 0, message, args);
         return t;
     }
-    
+
+    public static final void checkTrue(boolean flag, String message, Object arg) {
+        if (!flag) {
+            throwIllegalArgumentException(message, arg);
+        }
+    }
+
     public static final void checkTrue(boolean flag, String message, Object ... args) {
         if (!flag) {
             throwIllegalArgumentException(message, args);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f71181ea/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java b/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java
index 2edd32c..342c8f2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java
@@ -21,6 +21,7 @@ package org.apache.sshd.server.global;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshdSocketAddress;
 import org.apache.sshd.common.channel.RequestHandler;
+import org.apache.sshd.common.forward.TcpipForwarder;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.buffer.Buffer;
@@ -45,7 +46,9 @@ public class CancelTcpipForwardHandler extends AbstractLoggingBean implements Re
             if (log.isDebugEnabled()) {
                 log.debug("process(" + connectionService + ")[" + request + "] " + socketAddress + " reply=" + wantReply);
             }
-            connectionService.getTcpipForwarder().localPortForwardingCancelled(socketAddress);
+            
+            TcpipForwarder forwarder = connectionService.getTcpipForwarder(); 
+            forwarder.localPortForwardingCancelled(socketAddress);
 
             if (wantReply) {
                 Session session = connectionService.getSession();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f71181ea/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java b/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java
index dc6adcb..53b4ce5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java
@@ -21,6 +21,7 @@ package org.apache.sshd.server.global;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshdSocketAddress;
 import org.apache.sshd.common.channel.RequestHandler;
+import org.apache.sshd.common.forward.TcpipForwarder;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.buffer.Buffer;
@@ -42,7 +43,8 @@ public class TcpipForwardHandler extends AbstractLoggingBean implements RequestH
             String address = buffer.getString();
             int port = buffer.getInt();
             SshdSocketAddress socketAddress = new SshdSocketAddress(address, port);
-            SshdSocketAddress bound = connectionService.getTcpipForwarder().localPortForwardingRequested(socketAddress);
+            TcpipForwarder forwarder = connectionService.getTcpipForwarder();
+            SshdSocketAddress bound = forwarder.localPortForwardingRequested(socketAddress);
             if (log.isDebugEnabled()) {
                 log.debug("process(" + connectionService + ")[" + request + "] " + socketAddress + " => " + bound + ", reply=" + wantReply);
             }
@@ -56,6 +58,7 @@ public class TcpipForwardHandler extends AbstractLoggingBean implements RequestH
             }
             return Result.Replied;
         }
+
         return Result.Unsupported;
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f71181ea/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java b/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
index b4848a2..3cc2b7d 100644
--- a/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
@@ -20,9 +20,12 @@ package org.apache.sshd;
 
 import static org.apache.sshd.util.Utils.getFreePort;
 
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.nio.charset.StandardCharsets;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -121,17 +124,20 @@ public class PortForwardingTest extends BaseTestSupport {
             session.setPortForwardingR(forwardedPort, "localhost", echoPort);
             Thread.sleep(100);
     
-            try(Socket s = new Socket("localhost", forwardedPort)) {
+            try(Socket s = new Socket("localhost", forwardedPort);
+                OutputStream output = s.getOutputStream();
+                InputStream input = s.getInputStream()) {
+
                 s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
 
                 String  expected = getCurrentTestName();
-                byte[]  bytes = expected.getBytes();
-                s.getOutputStream().write(bytes);
-                s.getOutputStream().flush();
+                byte[]  bytes = expected.getBytes(StandardCharsets.UTF_8);
+                output.write(bytes);
+                output.flush();
 
                 byte[]  buf = new byte[bytes.length + Long.SIZE];
-                int     n = s.getInputStream().read(buf);
-                String  res = new String(buf, 0, n);
+                int     n = input.read(buf);
+                String  res = new String(buf, 0, n, StandardCharsets.UTF_8);
                 assertEquals("Mismatched data", expected, res);
             }
     
@@ -142,22 +148,59 @@ public class PortForwardingTest extends BaseTestSupport {
     }
 
     @Test
+    public void testRemoteForwardingSecondTimeInSameSession() throws Exception {
+        Session session = createSession();
+        try {
+            int forwardedPort = getFreePort();
+            session.setPortForwardingR(forwardedPort, "localhost", echoPort);
+            Thread.sleep(100L);
+            session.delPortForwardingR("localhost", forwardedPort);
+            Thread.sleep(100L);
+            session.setPortForwardingR(forwardedPort, "localhost", echoPort);
+            Thread.sleep(100L);
+    
+            try(Socket s = new Socket("localhost", forwardedPort);
+                OutputStream output = s.getOutputStream();
+                InputStream input = s.getInputStream()) {
+
+                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+                
+                String  expected = getCurrentTestName();
+                byte[]  bytes = expected.getBytes(StandardCharsets.UTF_8);
+                output.write(bytes);
+                output.flush();
+
+                byte[]  buf = new byte[bytes.length + Long.SIZE];
+                int     n = input.read(buf);
+                String  res = new String(buf, 0, n, StandardCharsets.UTF_8);
+                assertEquals("Mismatched data", expected, res);
+            }
+            session.delPortForwardingR("localhost", forwardedPort);
+        } finally {
+            session.disconnect();
+        }
+    }
+
+    @Test
     public void testRemoteForwardingNative() throws Exception {
         try(ClientSession session = createNativeSession()) {
             SshdSocketAddress remote = new SshdSocketAddress("", 0);
             SshdSocketAddress local = new SshdSocketAddress("localhost", echoPort);
             SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
     
-            try(Socket s = new Socket(bound.getHostName(), bound.getPort())) {
+            try(Socket s = new Socket(bound.getHostName(), bound.getPort());
+                OutputStream output = s.getOutputStream();
+                InputStream input = s.getInputStream()) {
+
                 s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
 
                 String  expected = getCurrentTestName();
                 byte[]  bytes = expected.getBytes();
-                s.getOutputStream().write(bytes);
-                s.getOutputStream().flush();
+                output.write(bytes);
+                output.flush();
 
                 byte[]  buf = new byte[bytes.length + Long.SIZE];
-                int     n = s.getInputStream().read(buf);
+                int     n = input.read(buf);
                 String  res = new String(buf, 0, n);
                 assertEquals("Mismatched data", expected, res);
             }
@@ -174,7 +217,10 @@ public class PortForwardingTest extends BaseTestSupport {
             SshdSocketAddress local = new SshdSocketAddress("localhost", echoPort);
             SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
 
-            try(Socket s = new Socket(bound.getHostName(), bound.getPort())) {
+            try(Socket s = new Socket(bound.getHostName(), bound.getPort());
+                OutputStream output = s.getOutputStream();
+                InputStream input = s.getInputStream()) {
+
                 s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
 
                 String  expected = getCurrentTestName();
@@ -182,10 +228,10 @@ public class PortForwardingTest extends BaseTestSupport {
                 byte[]  buf = new byte[bytes.length + Long.SIZE];
 
                 for (int i = 0; i < 1000; i++) {
-                    s.getOutputStream().write(bytes);
-                    s.getOutputStream().flush();
+                    output.write(bytes);
+                    output.flush();
 
-                    int     n = s.getInputStream().read(buf);
+                    int     n = input.read(buf);
                     String  res = new String(buf, 0, n);
                     assertEquals("Mismatched data at iteration #" + i, expected, res);
                 }
@@ -203,17 +249,20 @@ public class PortForwardingTest extends BaseTestSupport {
             int forwardedPort = getFreePort();
             session.setPortForwardingL(forwardedPort, "localhost", echoPort);
     
-            try(Socket s = new Socket("localhost", forwardedPort)) {
+            try(Socket s = new Socket("localhost", forwardedPort);
+                OutputStream output = s.getOutputStream();
+                InputStream input = s.getInputStream()) {
+
                 s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
 
                 String  expected = getCurrentTestName();
                 byte[]  bytes = expected.getBytes();
 
-                s.getOutputStream().write(bytes);
-                s.getOutputStream().flush();
+                output.write(bytes);
+                output.flush();
 
                 byte[]  buf = new byte[bytes.length + Long.SIZE];
-                int     n = s.getInputStream().read(buf);
+                int     n = input.read(buf);
                 String  res = new String(buf, 0, n);
                 assertEquals("Mismatched data", expected, res);
             }
@@ -231,17 +280,20 @@ public class PortForwardingTest extends BaseTestSupport {
             SshdSocketAddress remote = new SshdSocketAddress("localhost", echoPort);
             SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
 
-            try(Socket s = new Socket(bound.getHostName(), bound.getPort())) {
+            try(Socket s = new Socket(bound.getHostName(), bound.getPort());
+                OutputStream output = s.getOutputStream();
+                InputStream input = s.getInputStream()) {
+
                 s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
 
                 String  expected = getCurrentTestName();
                 byte[]  bytes = expected.getBytes();
 
-                s.getOutputStream().write(bytes);
-                s.getOutputStream().flush();
+                output.write(bytes);
+                output.flush();
 
                 byte[]  buf = new byte[bytes.length + Long.SIZE];
-                int     n = s.getInputStream().read(buf);
+                int     n = input.read(buf);
                 String  res = new String(buf, 0, n);
                 assertEquals("Mismatched data", expected, res);
             }
@@ -277,14 +329,17 @@ public class PortForwardingTest extends BaseTestSupport {
             String  expected = getCurrentTestName();
             byte[]  bytes = expected.getBytes();
             byte[]  buf = new byte[bytes.length + Long.SIZE];
-            try(Socket s = new Socket(bound.getHostName(), bound.getPort())) {
+            try(Socket s = new Socket(bound.getHostName(), bound.getPort());
+                OutputStream output = s.getOutputStream();
+                InputStream input = s.getInputStream()) {
+
                 s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
 
                 for (int i = 0; i < 1000; i++) {
-                    s.getOutputStream().write(bytes);
-                    s.getOutputStream().flush();
+                    output.write(bytes);
+                    output.flush();
 
-                    int     n = s.getInputStream().read(buf);
+                    int     n = input.read(buf);
                     String  res = new String(buf, 0, n);
                     assertEquals("Mismatched data at iteration #" + i, expected, res);
                 }
@@ -307,13 +362,16 @@ public class PortForwardingTest extends BaseTestSupport {
                 String  expected = getCurrentTestName();
                 byte[]  bytes = expected.getBytes();
 
-                channel.getInvertedIn().write(bytes);
-                channel.getInvertedIn().flush();
-
-                byte[]  buf = new byte[bytes.length + Long.SIZE];
-                int     n = channel.getInvertedOut().read(buf);
-                String  res = new String(buf, 0, n);
-                assertEquals("Mismatched data", expected, res);
+                try(OutputStream output = channel.getInvertedIn();
+                    InputStream input = channel.getInvertedOut()) {
+                    output.write(bytes);
+                    output.flush();
+    
+                    byte[]  buf = new byte[bytes.length + Long.SIZE];
+                    int     n = input.read(buf);
+                    String  res = new String(buf, 0, n);
+                    assertEquals("Mismatched data", expected, res);
+                }
                 channel.close(false);
             }
 
@@ -338,7 +396,7 @@ public class PortForwardingTest extends BaseTestSupport {
         
                 // 4. Make sure the NIOprocessor is not stuck
                 {
-                    Thread.sleep(1000);
+                    Thread.sleep(1000L);
                     // from here, we need to check all the threads running and find a
                     // "NioProcessor-"
                     // that is stuck on a PortForward.dispose