You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/12/09 10:19:01 UTC

httpcomponents-core git commit: HTTPCORE-563: client support for SOCKS version 5

Repository: httpcomponents-core
Updated Branches:
  refs/heads/master d6b9205e0 -> 42d992fe4


HTTPCORE-563: client support for SOCKS version 5


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/42d992fe
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/42d992fe
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/42d992fe

Branch: refs/heads/master
Commit: 42d992fe4d8fd04c4181a012dea36fef28314f98
Parents: d6b9205
Author: David Maplesden <da...@maplesden.co.nz>
Authored: Tue Nov 27 16:13:05 2018 +1300
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Sun Dec 9 11:15:14 2018 +0100

----------------------------------------------------------------------
 .../impl/nio/ClientHttpProtocolNegotiator.java  |  32 +-
 .../org/apache/hc/core5/testing/SocksProxy.java | 300 ++++++++++++++++
 .../core5/testing/nio/Http1IntegrationTest.java |   6 +-
 .../nio/Http1SocksProxyIntegrationTest.java     |  67 ++++
 .../core5/testing/nio/Http2IntegrationTest.java |   6 +-
 .../nio/Http2SocksProxyIntegrationTest.java     |  67 ++++
 .../hc/core5/reactor/IOReactorConfig.java       |  63 +++-
 .../hc/core5/reactor/SingleCoreIOReactor.java   |  16 +-
 .../reactor/SocksProxyProtocolHandler.java      | 343 +++++++++++++++++++
 .../SocksProxyProtocolHandlerFactory.java       |  57 +++
 10 files changed, 936 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
index 003f118..6b3a97e 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
 
 import javax.net.ssl.SSLSession;
 
@@ -114,6 +115,19 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
         }
     }
 
+    private void writeOutPreface(final IOSession session) throws IOException {
+        if (preface.hasRemaining()) {
+            final ByteChannel channel = session.channel();
+            channel.write(preface);
+        }
+        if (!preface.hasRemaining()) {
+            session.clearEvent(SelectionKey.OP_WRITE);
+            startHttp2(session);
+        } else {
+            session.setEvent(SelectionKey.OP_WRITE);
+        }
+    }
+
     @Override
     public void connected(final IOSession session) throws IOException {
         switch (versionPolicy) {
@@ -133,31 +147,19 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
         if (preface == null) {
             startHttp1(session);
         } else {
-            if (preface.hasRemaining()) {
-                final ByteChannel channel = session.channel();
-                channel.write(preface);
-            }
-            if (!preface.hasRemaining()) {
-                startHttp2(session);
-            }
+            writeOutPreface(session);
         }
     }
 
     @Override
-    public void inputReady(final IOSession session)throws IOException  {
+    public void inputReady(final IOSession session) throws IOException  {
         outputReady(session);
     }
 
     @Override
     public void outputReady(final IOSession session) throws IOException {
         if (preface != null) {
-            if (preface.hasRemaining()) {
-                final ByteChannel channel = session.channel();
-                channel.write(preface);
-            }
-            if (!preface.hasRemaining()) {
-                startHttp2(session);
-            }
+            writeOutPreface(session);
         } else {
             session.close(CloseMode.GRACEFUL);
         }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/SocksProxy.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/SocksProxy.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/SocksProxy.java
new file mode 100644
index 0000000..2e9a81b
--- /dev/null
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/SocksProxy.java
@@ -0,0 +1,300 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.testing;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hc.core5.util.TimeValue;
+
+/**
+ * Cheap and nasty SOCKS protocol version 5 proxy, recommended for use in unit tests only so we can test our SOCKS client code.
+ */
+public class SocksProxy {
+
+    private static class SocksProxyHandler {
+
+        public static final int VERSION_5 = 5;
+        public static final int COMMAND_CONNECT = 1;
+        public static final int ATYP_IPV4 = 1;
+        public static final int ATYP_DOMAINNAME = 3;
+        public static final int ATYP_IPV6 = 4;
+
+        private final SocksProxy parent;
+        private final Socket socket;
+        private volatile Socket remote;
+
+        public SocksProxyHandler(final SocksProxy parent, final Socket socket) {
+            this.parent = parent;
+            this.socket = socket;
+        }
+
+        public void start() {
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        final DataInputStream input = new DataInputStream(socket.getInputStream());
+                        final DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+                        final Socket target = establishConnection(input, output);
+                        remote = target;
+
+                        final Thread t1 = pumpStream(input, target.getOutputStream());
+                        final Thread t2 = pumpStream(target.getInputStream(), output);
+                        try {
+                            t1.join();
+                        } catch (final InterruptedException e) {
+                        }
+                        try {
+                            t2.join();
+                        } catch (final InterruptedException e) {
+                        }
+                    } catch (final IOException e) {
+                    } finally {
+                        parent.cleanupSocksProxyHandler(SocksProxyHandler.this);
+                    }
+                }
+
+                private Socket establishConnection(final DataInputStream input, final DataOutputStream output) throws IOException {
+                    final int clientVersion = input.readUnsignedByte();
+                    if (clientVersion != VERSION_5) {
+                        throw new IOException("SOCKS implementation only supports version 5");
+                    }
+                    final int nMethods = input.readUnsignedByte();
+                    for (int i = 0; i < nMethods; i++) {
+                        input.readUnsignedByte(); // auth method
+                    }
+                    // response
+                    output.writeByte(VERSION_5);
+                    output.writeByte(0); // no auth method
+                    output.flush();
+
+                    input.readUnsignedByte(); // client version again
+                    final int command = input.readUnsignedByte();
+                    if (command != COMMAND_CONNECT) {
+                        throw new IOException("SOCKS implementation only supports CONNECT command");
+                    }
+                    input.readUnsignedByte(); // reserved
+
+                    final String targetHost;
+                    final byte[] targetAddress;
+                    final int addressType = input.readUnsignedByte();
+                    switch (addressType) {
+                        case ATYP_IPV4:
+                            targetHost = null;
+                            targetAddress = new byte[4];
+                            for (int i = 0; i < targetAddress.length; i++) {
+                                targetAddress[i] = input.readByte();
+                            }
+                            break;
+                        case ATYP_IPV6:
+                            targetHost = null;
+                            targetAddress = new byte[16];
+                            for (int i = 0; i < targetAddress.length; i++) {
+                                targetAddress[i] = input.readByte();
+                            }
+                            break;
+                        case ATYP_DOMAINNAME:
+                            final int length = input.readUnsignedByte();
+                            final StringBuffer domainname = new StringBuffer();
+                            for (int i = 0; i < length; i++) {
+                                domainname.append((char) input.readUnsignedByte());
+                            }
+                            targetHost = domainname.toString();
+                            targetAddress = null;
+                            break;
+                        default:
+                            throw new IOException("Unsupported address type: " + addressType);
+                    }
+
+                    final int targetPort = input.readUnsignedShort();
+                    final Socket target;
+                    if (targetHost != null) {
+                        target = new Socket(targetHost, targetPort);
+                    } else {
+                        target = new Socket(InetAddress.getByAddress(targetAddress), targetPort);
+                    }
+
+                    output.writeByte(VERSION_5);
+                    output.writeByte(0); /* success */
+                    output.writeByte(0); /* reserved */
+                    final byte[] localAddress = target.getLocalAddress().getAddress();
+                    if (localAddress.length == 4) {
+                        output.writeByte(ATYP_IPV4);
+                    } else if (localAddress.length == 16) {
+                        output.writeByte(ATYP_IPV6);
+                    } else {
+                        throw new IOException("Unsupported localAddress byte length: " + localAddress.length);
+                    }
+                    output.write(localAddress);
+                    output.writeShort(target.getLocalPort());
+                    output.flush();
+
+                    return target;
+                }
+
+                private Thread pumpStream(final InputStream input, final OutputStream output) {
+                    final Thread t = new Thread(new Runnable() {
+                        @Override
+                        public void run() {
+                            final byte[] buffer = new byte[1024 * 8];
+                            try {
+                                while (true) {
+                                    final int read = input.read(buffer);
+                                    if (read < 0) {
+                                        break;
+                                    }
+                                    output.write(buffer, 0, read);
+                                    output.flush();
+                                }
+                            } catch (final IOException e) {
+                            } finally {
+                                shutdown();
+                            }
+                        }
+                    });
+                    t.start();
+                    return t;
+                }
+
+            }).start();
+        }
+
+        public void shutdown() {
+            try {
+                this.socket.close();
+            } catch (final IOException e) {
+            }
+            if (this.remote != null) {
+                try {
+                    this.remote.close();
+                } catch (final IOException e) {
+                }
+            }
+        }
+
+    }
+
+    private final int port;
+
+    private final List<SocksProxyHandler> handlers = new ArrayList<SocksProxyHandler>();
+    private ServerSocket server;
+    private Thread serverThread;
+
+    public SocksProxy() {
+        this(0);
+    }
+
+    public SocksProxy(final int port) {
+        this.port = port;
+    }
+
+    public synchronized void start() throws IOException {
+        if (this.server == null) {
+            this.server = new ServerSocket(this.port);
+            this.serverThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        while (true) {
+                            final Socket socket = server.accept();
+                            startSocksProxyHandler(socket);
+                        }
+                    } catch (final IOException e) {
+                    } finally {
+                        if (server != null) {
+                            try {
+                                server.close();
+                            } catch (final IOException e) {
+                            }
+                            server = null;
+                        }
+                    }
+                }
+            });
+            this.serverThread.start();
+        }
+    }
+
+    public void shutdown(final TimeValue timeout) throws InterruptedException {
+        final long waitUntil = System.currentTimeMillis() + timeout.toMillis();
+        Thread t = null;
+        synchronized (this) {
+            if (this.server != null) {
+                try {
+                    this.server.close();
+                } catch (final IOException e) {
+                } finally {
+                    this.server = null;
+                }
+                t = this.serverThread;
+                this.serverThread = null;
+            }
+            for (final SocksProxyHandler handler : this.handlers) {
+                handler.shutdown();
+            }
+            while (!this.handlers.isEmpty()) {
+                final long waitTime = waitUntil - System.currentTimeMillis();
+                if (waitTime > 0) {
+                    wait(waitTime);
+                }
+            }
+        }
+        if (t != null) {
+            final long waitTime = waitUntil - System.currentTimeMillis();
+            if (waitTime > 0) {
+                t.join(waitTime);
+            }
+        }
+    }
+
+    protected void startSocksProxyHandler(final Socket socket) {
+        final SocksProxyHandler handler = new SocksProxyHandler(this, socket);
+        synchronized (this) {
+            this.handlers.add(handler);
+        }
+        handler.start();
+    }
+
+    protected synchronized void cleanupSocksProxyHandler(final SocksProxyHandler handler) {
+        this.handlers.remove(handler);
+    }
+
+    public SocketAddress getProxyAddress() {
+        return this.server.getLocalSocketAddress();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
index 3661a80..bb23dee 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
@@ -164,10 +164,14 @@ public class Http1IntegrationTest extends InternalHttp1ServerTestBase {
     public void setup() throws Exception {
         log.debug("Starting up test client");
         client = new Http1TestClient(
-                IOReactorConfig.DEFAULT,
+                buildReactorConfig(),
                 scheme == URIScheme.HTTPS ? SSLTestContexts.createClientSSLContext() : null);
     }
 
+    protected IOReactorConfig buildReactorConfig() {
+        return IOReactorConfig.DEFAULT;
+    }
+
     @After
     public void cleanup() throws Exception {
         log.debug("Shutting down test client");

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1SocksProxyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1SocksProxyIntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1SocksProxyIntegrationTest.java
new file mode 100644
index 0000000..be06284
--- /dev/null
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1SocksProxyIntegrationTest.java
@@ -0,0 +1,67 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.nio;
+
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.testing.SocksProxy;
+import org.apache.hc.core5.util.TimeValue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class Http1SocksProxyIntegrationTest extends Http1IntegrationTest {
+
+    protected static SocksProxy PROXY;
+
+    @BeforeClass
+    public static void before() throws Throwable {
+        PROXY = new SocksProxy();
+        PROXY.start();
+    }
+
+    @AfterClass
+    public static void after() {
+        if (PROXY != null) {
+            try {
+                PROXY.shutdown(TimeValue.ofSeconds(5));
+            } catch (final Exception ignore) {
+            }
+            PROXY = null;
+        }
+    }
+
+    public Http1SocksProxyIntegrationTest(final URIScheme scheme) {
+        super(scheme);
+    }
+
+    @Override
+    protected IOReactorConfig buildReactorConfig() {
+        return IOReactorConfig.custom().setSocksProxyAddress(PROXY.getProxyAddress()).build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
index 30a3998..a6d4b42 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
@@ -155,10 +155,14 @@ public class Http2IntegrationTest extends InternalHttp2ServerTestBase {
     @Before
     public void setup() throws Exception {
         log.debug("Starting up test client");
-        client = new Http2TestClient(IOReactorConfig.DEFAULT,
+        client = new Http2TestClient(buildReactorConfig(),
                 scheme == URIScheme.HTTPS ? SSLTestContexts.createClientSSLContext() : null);
     }
 
+    protected IOReactorConfig buildReactorConfig() {
+        return IOReactorConfig.DEFAULT;
+    }
+
     @After
     public void cleanup() throws Exception {
         log.debug("Shutting down test client");

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2SocksProxyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2SocksProxyIntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2SocksProxyIntegrationTest.java
new file mode 100644
index 0000000..9a958e2
--- /dev/null
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2SocksProxyIntegrationTest.java
@@ -0,0 +1,67 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.nio;
+
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.testing.SocksProxy;
+import org.apache.hc.core5.util.TimeValue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class Http2SocksProxyIntegrationTest extends Http2IntegrationTest {
+
+    protected static SocksProxy PROXY;
+
+    @BeforeClass
+    public static void before() throws Throwable {
+        PROXY = new SocksProxy();
+        PROXY.start();
+    }
+
+    @AfterClass
+    public static void after() {
+        if (PROXY != null) {
+            try {
+                PROXY.shutdown(TimeValue.ofSeconds(5));
+            } catch (final Exception ignore) {
+            }
+            PROXY = null;
+        }
+    }
+
+    public Http2SocksProxyIntegrationTest(final URIScheme scheme) {
+        super(scheme);
+    }
+
+    @Override
+    protected IOReactorConfig buildReactorConfig() {
+        return IOReactorConfig.custom().setSocksProxyAddress(PROXY.getProxyAddress()).build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java
index e678843..1b0c5d7 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java
@@ -27,6 +27,7 @@
 
 package org.apache.hc.core5.reactor;
 
+import java.net.SocketAddress;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hc.core5.annotation.Contract;
@@ -55,6 +56,9 @@ public final class IOReactorConfig {
     private final int sndBufSize;
     private final int rcvBufSize;
     private final int backlogSize;
+    private final SocketAddress socksProxyAddress;
+    private final String socksProxyUsername;
+    private final String socksProxyPassword;
 
     IOReactorConfig(
             final TimeValue selectInterval,
@@ -66,7 +70,10 @@ public final class IOReactorConfig {
             final boolean tcpNoDelay,
             final int sndBufSize,
             final int rcvBufSize,
-            final int backlogSize) {
+            final int backlogSize,
+            final SocketAddress socksProxyAddress,
+            final String socksProxyUsername,
+            final String socksProxyPassword) {
         super();
         this.selectInterval = selectInterval;
         this.ioThreadCount = ioThreadCount;
@@ -78,6 +85,9 @@ public final class IOReactorConfig {
         this.sndBufSize = sndBufSize;
         this.rcvBufSize = rcvBufSize;
         this.backlogSize = backlogSize;
+        this.socksProxyAddress = socksProxyAddress;
+        this.socksProxyUsername = socksProxyUsername;
+        this.socksProxyPassword = socksProxyPassword;
     }
 
     /**
@@ -203,6 +213,27 @@ public final class IOReactorConfig {
         return backlogSize;
     }
 
+    /**
+     * The address of the SOCKS proxy to use.
+     */
+    public SocketAddress getSocksProxyAddress() {
+        return this.socksProxyAddress;
+    }
+
+    /**
+     * The username to provide to the SOCKS proxy for username/password authentication.
+     */
+    public String getSocksProxyUsername() {
+        return this.socksProxyUsername;
+    }
+
+    /**
+     * The password to provide to the SOCKS proxy for username/password authentication.
+     */
+    public String getSocksProxyPassword() {
+        return this.socksProxyPassword;
+    }
+
     public static Builder custom() {
         return new Builder();
     }
@@ -219,7 +250,10 @@ public final class IOReactorConfig {
             .setTcpNoDelay(config.isTcpNoDelay())
             .setSndBufSize(config.getSndBufSize())
             .setRcvBufSize(config.getRcvBufSize())
-            .setBacklogSize(config.getBacklogSize());
+            .setBacklogSize(config.getBacklogSize())
+            .setSocksProxyAddress(config.getSocksProxyAddress())
+            .setSocksProxyUsername(config.getSocksProxyUsername())
+            .setSocksProxyPassword(config.getSocksProxyPassword());
     }
 
     public static class Builder {
@@ -261,6 +295,9 @@ public final class IOReactorConfig {
         private int sndBufSize;
         private int rcvBufSize;
         private int backlogSize;
+        private SocketAddress socksProxyAddress;
+        private String socksProxyUsername;
+        private String socksProxyPassword;
 
         Builder() {
             this.selectInterval = TimeValue.ofSeconds(1);
@@ -273,6 +310,9 @@ public final class IOReactorConfig {
             this.sndBufSize = 0;
             this.rcvBufSize = 0;
             this.backlogSize = 0;
+            this.socksProxyAddress = null;
+            this.socksProxyUsername = null;
+            this.socksProxyPassword = null;
         }
 
         public Builder setSelectInterval(final TimeValue selectInterval) {
@@ -335,6 +375,21 @@ public final class IOReactorConfig {
             return this;
         }
 
+        public Builder setSocksProxyAddress(final SocketAddress socksProxyAddress) {
+            this.socksProxyAddress = socksProxyAddress;
+            return this;
+        }
+
+        public Builder setSocksProxyUsername(final String socksProxyUsername) {
+            this.socksProxyUsername = socksProxyUsername;
+            return this;
+        }
+
+        public Builder setSocksProxyPassword(final String socksProxyPassword) {
+            this.socksProxyPassword = socksProxyPassword;
+            return this;
+        }
+
         public IOReactorConfig build() {
             return new IOReactorConfig(
                     selectInterval != null ? selectInterval : TimeValue.ofSeconds(1),
@@ -344,7 +399,8 @@ public final class IOReactorConfig {
                     TimeValue.defaultsToNegativeOneMillisecond(soLinger),
                     soKeepAlive,
                     tcpNoDelay,
-                    sndBufSize, rcvBufSize, backlogSize);
+                    sndBufSize, rcvBufSize, backlogSize,
+                    socksProxyAddress, socksProxyUsername, socksProxyPassword);
         }
 
     }
@@ -362,6 +418,7 @@ public final class IOReactorConfig {
                 .append(", sndBufSize=").append(this.sndBufSize)
                 .append(", rcvBufSize=").append(this.rcvBufSize)
                 .append(", backlogSize=").append(this.backlogSize)
+                .append(", socksProxyAddress=").append(this.socksProxyAddress)
                 .append("]");
         return builder.toString();
     }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
index 65a51a3..412ce30 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
@@ -313,7 +313,21 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect
             sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
             sock.bind(sessionRequest.localAddress);
         }
-        final boolean connected = socketChannel.connect(sessionRequest.remoteAddress);
+
+        final SocketAddress targetAddress;
+        final IOEventHandlerFactory eventHandlerFactory;
+        if (this.reactorConfig.getSocksProxyAddress() != null) {
+            targetAddress = this.reactorConfig.getSocksProxyAddress();
+            eventHandlerFactory = new SocksProxyProtocolHandlerFactory(
+                    sessionRequest.remoteAddress,
+                    this.reactorConfig.getSocksProxyUsername(),
+                    this.reactorConfig.getSocksProxyPassword(),
+                    this.eventHandlerFactory);
+        } else {
+            targetAddress = sessionRequest.remoteAddress;
+            eventHandlerFactory = this.eventHandlerFactory;
+        }
+        final boolean connected = socketChannel.connect(targetAddress);
         final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
         final InternalChannel channel = new InternalConnectChannel(key, socketChannel, sessionRequest, new InternalDataChannelFactory() {
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandler.java
new file mode 100644
index 0000000..7d791fd
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandler.java
@@ -0,0 +1,343 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.UnresolvedAddressException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hc.core5.http.ConnectionClosedException;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Implements the client side of SOCKS protocol version 5 as per https://tools.ietf.org/html/rfc1928. Supports SOCKS username/password
+ * authentication as per https://tools.ietf.org/html/rfc1929.
+ */
+final class SocksProxyProtocolHandler implements IOEventHandler {
+
+    private static final int MAX_COMMAND_CONNECT_LENGTH = 22;
+
+    private static final byte CLIENT_VERSION = 5;
+
+    private static final byte NO_AUTHENTICATION_REQUIRED = 0;
+
+    private static final byte USERNAME_PASSWORD = 2;
+
+    private static final byte USERNAME_PASSWORD_VERSION = 1;
+
+    private static final byte SUCCESS = 0;
+
+    private static final byte COMMAND_CONNECT = 1;
+
+    private static final byte ATYP_IPV4 = 1;
+
+    private static final byte ATYP_DOMAINNAME = 3;
+
+    private static final byte ATYP_IPV6 = 4;
+
+    private static enum State {
+        SEND_AUTH, RECEIVE_AUTH_METHOD, SEND_USERNAME_PASSWORD, RECEIVE_AUTH, SEND_CONNECT, RECEIVE_RESPONSE_CODE, RECEIVE_ADDRESS_TYPE, RECEIVE_ADDRESS, COMPLETE
+    }
+
+    private final ProtocolIOSession ioSession;
+    private final Object attachment;
+    private final InetSocketAddress targetAddress;
+    private final String username;
+    private final String password;
+    private final IOEventHandlerFactory eventHandlerFactory;
+
+    // a 32 byte buffer is enough for all usual SOCKS negotiations, we expand it if necessary during the processing
+    private ByteBuffer buffer = ByteBuffer.allocate(32);
+    private State state = State.SEND_AUTH;
+    private int remainingResponseSize = -1;
+
+    SocksProxyProtocolHandler(final ProtocolIOSession ioSession, final Object attachment, final InetSocketAddress targetAddress,
+            final String username, final String password, final IOEventHandlerFactory eventHandlerFactory) {
+        this.ioSession = ioSession;
+        this.attachment = attachment;
+        this.targetAddress = targetAddress;
+        this.username = username;
+        this.password = password;
+        this.eventHandlerFactory = eventHandlerFactory;
+    }
+
+    @Override
+    public void connected(final IOSession session) throws IOException {
+        this.buffer.put(CLIENT_VERSION);
+        this.buffer.put((byte) 1);
+        this.buffer.put(NO_AUTHENTICATION_REQUIRED);
+        this.buffer.flip();
+        session.setEventMask(SelectionKey.OP_WRITE);
+    }
+
+    @Override
+    public void outputReady(final IOSession session) throws IOException {
+        switch (this.state) {
+            case SEND_AUTH:
+                if (writeAndPrepareRead(session.channel(), 2)) {
+                    session.setEventMask(SelectionKey.OP_READ);
+                    this.state = State.RECEIVE_AUTH_METHOD;
+                }
+                break;
+            case SEND_USERNAME_PASSWORD:
+                if (writeAndPrepareRead(session.channel(), 2)) {
+                    session.setEventMask(SelectionKey.OP_READ);
+                    this.state = State.RECEIVE_AUTH;
+                }
+                break;
+            case SEND_CONNECT:
+                if (writeAndPrepareRead(session.channel(), 2)) {
+                    session.setEventMask(SelectionKey.OP_READ);
+                    this.state = State.RECEIVE_RESPONSE_CODE;
+                }
+                break;
+            case RECEIVE_AUTH_METHOD:
+            case RECEIVE_AUTH:
+            case RECEIVE_ADDRESS:
+            case RECEIVE_ADDRESS_TYPE:
+            case RECEIVE_RESPONSE_CODE:
+                session.setEventMask(SelectionKey.OP_READ);
+                break;
+            case COMPLETE:
+                break;
+        }
+    }
+
+    @Override
+    public void inputReady(final IOSession session) throws IOException {
+        switch (this.state) {
+            case RECEIVE_AUTH_METHOD:
+                if (fillBuffer(session.channel())) {
+                    this.buffer.flip();
+                    final byte serverVersion = this.buffer.get();
+                    final byte serverMethod = this.buffer.get();
+                    if (serverVersion != CLIENT_VERSION) {
+                        throw new IOException("SOCKS server returned unsupported version: " + serverVersion);
+                    }
+                    if (serverMethod == USERNAME_PASSWORD) {
+                        this.buffer.clear();
+                        setBufferLimit(this.username.length() + this.password.length() + 3);
+                        this.buffer.put(USERNAME_PASSWORD_VERSION);
+                        this.buffer.put((byte) this.username.length());
+                        this.buffer.put(this.username.getBytes(StandardCharsets.ISO_8859_1));
+                        this.buffer.put((byte) this.password.length());
+                        this.buffer.put(this.password.getBytes(StandardCharsets.ISO_8859_1));
+                        session.setEventMask(SelectionKey.OP_WRITE);
+                        this.state = State.SEND_USERNAME_PASSWORD;
+                    } else if (serverMethod == NO_AUTHENTICATION_REQUIRED) {
+                        prepareConnectCommand();
+                        session.setEventMask(SelectionKey.OP_WRITE);
+                        this.state = State.SEND_CONNECT;
+                    } else {
+                        throw new IOException("SOCKS server return unsupported authentication method: " + serverMethod);
+                    }
+                }
+                break;
+            case RECEIVE_AUTH:
+                if (fillBuffer(session.channel())) {
+                    this.buffer.flip();
+                    this.buffer.get(); // skip server auth version
+                    final byte status = this.buffer.get();
+                    if (status != SUCCESS) {
+                        throw new IOException("Authentication failed for external SOCKS proxy");
+                    }
+                    prepareConnectCommand();
+                    session.setEventMask(SelectionKey.OP_WRITE);
+                    this.state = State.SEND_CONNECT;
+                }
+                break;
+            case RECEIVE_RESPONSE_CODE:
+                if (fillBuffer(session.channel())) {
+                    this.buffer.flip();
+                    final byte serverVersion = this.buffer.get();
+                    final byte responseCode = this.buffer.get();
+                    if (serverVersion != CLIENT_VERSION) {
+                        throw new IOException("SOCKS server returned unsupported version: " + serverVersion);
+                    }
+                    if (responseCode != SUCCESS) {
+                        throw new IOException("SOCKS server was unable to establish connection returned error code: " + responseCode);
+                    }
+                    this.buffer.compact();
+                    this.buffer.limit(3);
+                    this.state = State.RECEIVE_ADDRESS_TYPE;
+                    // deliberate fall-through
+                } else {
+                    break;
+                }
+            case RECEIVE_ADDRESS_TYPE:
+                if (fillBuffer(session.channel())) {
+                    this.buffer.flip();
+                    this.buffer.get(); // reserved byte that has no purpose
+                    final byte aType = this.buffer.get();
+                    final int addressSize;
+                    if (aType == ATYP_IPV4) {
+                        addressSize = 4;
+                    } else if (aType == ATYP_IPV6) {
+                        addressSize = 16;
+                    } else if (aType == ATYP_DOMAINNAME) {
+                        // mask with 0xFF to convert to unsigned byte value
+                        addressSize = this.buffer.get() & 0xFF;
+                    } else {
+                        throw new IOException("SOCKS server returned unsupported address type: " + aType);
+                    }
+                    this.remainingResponseSize = addressSize + 2;
+                    this.buffer.compact();
+                    // make sure we only read what we need to, don't read too much
+                    this.buffer.limit(this.remainingResponseSize);
+                    this.state = State.RECEIVE_ADDRESS;
+                    // deliberate fall-through
+                } else {
+                    break;
+                }
+            case RECEIVE_ADDRESS:
+                if (fillBuffer(session.channel())) {
+                    this.buffer.clear();
+                    this.state = State.COMPLETE;
+                    final IOEventHandler newHandler = this.eventHandlerFactory.createHandler(this.ioSession, this.attachment);
+                    this.ioSession.upgrade(newHandler);
+                    newHandler.connected(this.ioSession);
+                }
+                break;
+            case SEND_AUTH:
+            case SEND_USERNAME_PASSWORD:
+            case SEND_CONNECT:
+                session.setEventMask(SelectionKey.OP_WRITE);
+                break;
+            case COMPLETE:
+                break;
+        }
+    }
+
+    private void prepareConnectCommand() throws IOException {
+        final InetAddress address = this.targetAddress.getAddress();
+        final int port = this.targetAddress.getPort();
+        if (address == null || port == 0) {
+            throw new UnresolvedAddressException();
+        }
+
+        this.buffer.clear();
+        setBufferLimit(MAX_COMMAND_CONNECT_LENGTH);
+        this.buffer.put(CLIENT_VERSION);
+        this.buffer.put(COMMAND_CONNECT);
+        this.buffer.put((byte) 0); // reserved
+        if (address instanceof Inet4Address) {
+            this.buffer.put(ATYP_IPV4);
+            this.buffer.put(address.getAddress());
+        } else if (address instanceof Inet6Address) {
+            this.buffer.put(ATYP_IPV6);
+            this.buffer.put(address.getAddress());
+        } else {
+            throw new IOException("Unsupported remote address class: " + address.getClass().getName());
+        }
+        this.buffer.putShort((short) port);
+        this.buffer.flip();
+    }
+
+    private void setBufferLimit(final int newLimit) {
+        if (this.buffer.capacity() < newLimit) {
+            final ByteBuffer newBuffer = ByteBuffer.allocate(newLimit);
+            this.buffer.flip();
+            newBuffer.put(this.buffer);
+            this.buffer = newBuffer;
+        } else {
+            this.buffer.limit(newLimit);
+        }
+    }
+
+    private boolean writeAndPrepareRead(final ByteChannel channel, final int readSize) throws IOException {
+        if (writeBuffer(channel)) {
+            this.buffer.clear();
+            setBufferLimit(readSize);
+            return true;
+        }
+        return false;
+    }
+
+    private boolean writeBuffer(final ByteChannel channel) throws IOException {
+        if (this.buffer.hasRemaining()) {
+            channel.write(this.buffer);
+        }
+        return !this.buffer.hasRemaining();
+    }
+
+    private boolean fillBuffer(final ByteChannel channel) throws IOException {
+        if (this.buffer.hasRemaining()) {
+            channel.read(this.buffer);
+        }
+        return !this.buffer.hasRemaining();
+    }
+
+    @Override
+    public void timeout(final IOSession session, final Timeout timeout) throws IOException {
+        exception(session, SocketTimeoutExceptionFactory.create(timeout));
+    }
+
+    @Override
+    public void exception(final IOSession session, final Exception cause) {
+        try {
+            cleanupRequests(cause);
+        } finally {
+            session.close(CloseMode.IMMEDIATE);
+        }
+    }
+
+    @Override
+    public void disconnected(final IOSession session) {
+        cleanupRequests(new ConnectionClosedException());
+    }
+
+    private void cleanupRequests(final Exception cause) {
+        while (true) {
+            final Command command = ioSession.poll();
+            if (command != null) {
+                if (command instanceof RequestExecutionCommand) {
+                    final RequestExecutionCommand executionCommand = (RequestExecutionCommand) command;
+                    final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
+                    exchangeHandler.failed(cause);
+                    exchangeHandler.releaseResources();
+                } else {
+                    command.cancel();
+                }
+            } else {
+                break;
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandlerFactory.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandlerFactory.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandlerFactory.java
new file mode 100644
index 0000000..d2fca8e
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandlerFactory.java
@@ -0,0 +1,57 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+public class SocksProxyProtocolHandlerFactory implements IOEventHandlerFactory {
+
+    private final InetSocketAddress targetAddress;
+    private final String username;
+    private final String password;
+    private final IOEventHandlerFactory eventHandlerFactory;
+
+    public SocksProxyProtocolHandlerFactory(final SocketAddress targetAddress, final String username, final String password, final IOEventHandlerFactory eventHandlerFactory) throws IOException {
+        this.eventHandlerFactory = eventHandlerFactory;
+        this.username = username;
+        this.password = password;
+        if (targetAddress instanceof InetSocketAddress) {
+            this.targetAddress = (InetSocketAddress) targetAddress;
+        } else {
+            throw new IOException("Unsupported target address type for SOCKS proxy connection: " + targetAddress.getClass());
+        }
+    }
+
+    @Override
+    public IOEventHandler createHandler(final ProtocolIOSession ioSession, final Object attachment) {
+        return new SocksProxyProtocolHandler(ioSession, attachment, this.targetAddress, this.username, this.password, this.eventHandlerFactory);
+    }
+
+}