You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/12/09 10:19:01 UTC
httpcomponents-core git commit: HTTPCORE-563: client support for
SOCKS version 5
Repository: httpcomponents-core
Updated Branches:
refs/heads/master d6b9205e0 -> 42d992fe4
HTTPCORE-563: client support for SOCKS version 5
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/42d992fe
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/42d992fe
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/42d992fe
Branch: refs/heads/master
Commit: 42d992fe4d8fd04c4181a012dea36fef28314f98
Parents: d6b9205
Author: David Maplesden <da...@maplesden.co.nz>
Authored: Tue Nov 27 16:13:05 2018 +1300
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Sun Dec 9 11:15:14 2018 +0100
----------------------------------------------------------------------
.../impl/nio/ClientHttpProtocolNegotiator.java | 32 +-
.../org/apache/hc/core5/testing/SocksProxy.java | 300 ++++++++++++++++
.../core5/testing/nio/Http1IntegrationTest.java | 6 +-
.../nio/Http1SocksProxyIntegrationTest.java | 67 ++++
.../core5/testing/nio/Http2IntegrationTest.java | 6 +-
.../nio/Http2SocksProxyIntegrationTest.java | 67 ++++
.../hc/core5/reactor/IOReactorConfig.java | 63 +++-
.../hc/core5/reactor/SingleCoreIOReactor.java | 16 +-
.../reactor/SocksProxyProtocolHandler.java | 343 +++++++++++++++++++
.../SocksProxyProtocolHandlerFactory.java | 57 +++
10 files changed, 936 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
index 003f118..6b3a97e 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
@@ -31,6 +31,7 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
import javax.net.ssl.SSLSession;
@@ -114,6 +115,19 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
}
}
+ private void writeOutPreface(final IOSession session) throws IOException {
+ if (preface.hasRemaining()) {
+ final ByteChannel channel = session.channel();
+ channel.write(preface);
+ }
+ if (!preface.hasRemaining()) {
+ session.clearEvent(SelectionKey.OP_WRITE);
+ startHttp2(session);
+ } else {
+ session.setEvent(SelectionKey.OP_WRITE);
+ }
+ }
+
@Override
public void connected(final IOSession session) throws IOException {
switch (versionPolicy) {
@@ -133,31 +147,19 @@ public class ClientHttpProtocolNegotiator implements HttpConnectionEventHandler
if (preface == null) {
startHttp1(session);
} else {
- if (preface.hasRemaining()) {
- final ByteChannel channel = session.channel();
- channel.write(preface);
- }
- if (!preface.hasRemaining()) {
- startHttp2(session);
- }
+ writeOutPreface(session);
}
}
@Override
- public void inputReady(final IOSession session)throws IOException {
+ public void inputReady(final IOSession session) throws IOException {
outputReady(session);
}
@Override
public void outputReady(final IOSession session) throws IOException {
if (preface != null) {
- if (preface.hasRemaining()) {
- final ByteChannel channel = session.channel();
- channel.write(preface);
- }
- if (!preface.hasRemaining()) {
- startHttp2(session);
- }
+ writeOutPreface(session);
} else {
session.close(CloseMode.GRACEFUL);
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/SocksProxy.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/SocksProxy.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/SocksProxy.java
new file mode 100644
index 0000000..2e9a81b
--- /dev/null
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/SocksProxy.java
@@ -0,0 +1,300 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.testing;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hc.core5.util.TimeValue;
+
+/**
+ * Cheap and nasty SOCKS protocol version 5 proxy, recommended for use in unit tests only so we can test our SOCKS client code.
+ */
+public class SocksProxy {
+
+ private static class SocksProxyHandler {
+
+ public static final int VERSION_5 = 5;
+ public static final int COMMAND_CONNECT = 1;
+ public static final int ATYP_IPV4 = 1;
+ public static final int ATYP_DOMAINNAME = 3;
+ public static final int ATYP_IPV6 = 4;
+
+ private final SocksProxy parent;
+ private final Socket socket;
+ private volatile Socket remote;
+
+ public SocksProxyHandler(final SocksProxy parent, final Socket socket) {
+ this.parent = parent;
+ this.socket = socket;
+ }
+
+ public void start() {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final DataInputStream input = new DataInputStream(socket.getInputStream());
+ final DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+ final Socket target = establishConnection(input, output);
+ remote = target;
+
+ final Thread t1 = pumpStream(input, target.getOutputStream());
+ final Thread t2 = pumpStream(target.getInputStream(), output);
+ try {
+ t1.join();
+ } catch (final InterruptedException e) {
+ }
+ try {
+ t2.join();
+ } catch (final InterruptedException e) {
+ }
+ } catch (final IOException e) {
+ } finally {
+ parent.cleanupSocksProxyHandler(SocksProxyHandler.this);
+ }
+ }
+
+ private Socket establishConnection(final DataInputStream input, final DataOutputStream output) throws IOException {
+ final int clientVersion = input.readUnsignedByte();
+ if (clientVersion != VERSION_5) {
+ throw new IOException("SOCKS implementation only supports version 5");
+ }
+ final int nMethods = input.readUnsignedByte();
+ for (int i = 0; i < nMethods; i++) {
+ input.readUnsignedByte(); // auth method
+ }
+ // response
+ output.writeByte(VERSION_5);
+ output.writeByte(0); // no auth method
+ output.flush();
+
+ input.readUnsignedByte(); // client version again
+ final int command = input.readUnsignedByte();
+ if (command != COMMAND_CONNECT) {
+ throw new IOException("SOCKS implementation only supports CONNECT command");
+ }
+ input.readUnsignedByte(); // reserved
+
+ final String targetHost;
+ final byte[] targetAddress;
+ final int addressType = input.readUnsignedByte();
+ switch (addressType) {
+ case ATYP_IPV4:
+ targetHost = null;
+ targetAddress = new byte[4];
+ for (int i = 0; i < targetAddress.length; i++) {
+ targetAddress[i] = input.readByte();
+ }
+ break;
+ case ATYP_IPV6:
+ targetHost = null;
+ targetAddress = new byte[16];
+ for (int i = 0; i < targetAddress.length; i++) {
+ targetAddress[i] = input.readByte();
+ }
+ break;
+ case ATYP_DOMAINNAME:
+ final int length = input.readUnsignedByte();
+ final StringBuffer domainname = new StringBuffer();
+ for (int i = 0; i < length; i++) {
+ domainname.append((char) input.readUnsignedByte());
+ }
+ targetHost = domainname.toString();
+ targetAddress = null;
+ break;
+ default:
+ throw new IOException("Unsupported address type: " + addressType);
+ }
+
+ final int targetPort = input.readUnsignedShort();
+ final Socket target;
+ if (targetHost != null) {
+ target = new Socket(targetHost, targetPort);
+ } else {
+ target = new Socket(InetAddress.getByAddress(targetAddress), targetPort);
+ }
+
+ output.writeByte(VERSION_5);
+ output.writeByte(0); /* success */
+ output.writeByte(0); /* reserved */
+ final byte[] localAddress = target.getLocalAddress().getAddress();
+ if (localAddress.length == 4) {
+ output.writeByte(ATYP_IPV4);
+ } else if (localAddress.length == 16) {
+ output.writeByte(ATYP_IPV6);
+ } else {
+ throw new IOException("Unsupported localAddress byte length: " + localAddress.length);
+ }
+ output.write(localAddress);
+ output.writeShort(target.getLocalPort());
+ output.flush();
+
+ return target;
+ }
+
+ private Thread pumpStream(final InputStream input, final OutputStream output) {
+ final Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ final byte[] buffer = new byte[1024 * 8];
+ try {
+ while (true) {
+ final int read = input.read(buffer);
+ if (read < 0) {
+ break;
+ }
+ output.write(buffer, 0, read);
+ output.flush();
+ }
+ } catch (final IOException e) {
+ } finally {
+ shutdown();
+ }
+ }
+ });
+ t.start();
+ return t;
+ }
+
+ }).start();
+ }
+
+ public void shutdown() {
+ try {
+ this.socket.close();
+ } catch (final IOException e) {
+ }
+ if (this.remote != null) {
+ try {
+ this.remote.close();
+ } catch (final IOException e) {
+ }
+ }
+ }
+
+ }
+
+ private final int port;
+
+ private final List<SocksProxyHandler> handlers = new ArrayList<SocksProxyHandler>();
+ private ServerSocket server;
+ private Thread serverThread;
+
+ public SocksProxy() {
+ this(0);
+ }
+
+ public SocksProxy(final int port) {
+ this.port = port;
+ }
+
+ public synchronized void start() throws IOException {
+ if (this.server == null) {
+ this.server = new ServerSocket(this.port);
+ this.serverThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ final Socket socket = server.accept();
+ startSocksProxyHandler(socket);
+ }
+ } catch (final IOException e) {
+ } finally {
+ if (server != null) {
+ try {
+ server.close();
+ } catch (final IOException e) {
+ }
+ server = null;
+ }
+ }
+ }
+ });
+ this.serverThread.start();
+ }
+ }
+
+ public void shutdown(final TimeValue timeout) throws InterruptedException {
+ final long waitUntil = System.currentTimeMillis() + timeout.toMillis();
+ Thread t = null;
+ synchronized (this) {
+ if (this.server != null) {
+ try {
+ this.server.close();
+ } catch (final IOException e) {
+ } finally {
+ this.server = null;
+ }
+ t = this.serverThread;
+ this.serverThread = null;
+ }
+ for (final SocksProxyHandler handler : this.handlers) {
+ handler.shutdown();
+ }
+ while (!this.handlers.isEmpty()) {
+ final long waitTime = waitUntil - System.currentTimeMillis();
+ if (waitTime > 0) {
+ wait(waitTime);
+ }
+ }
+ }
+ if (t != null) {
+ final long waitTime = waitUntil - System.currentTimeMillis();
+ if (waitTime > 0) {
+ t.join(waitTime);
+ }
+ }
+ }
+
+ protected void startSocksProxyHandler(final Socket socket) {
+ final SocksProxyHandler handler = new SocksProxyHandler(this, socket);
+ synchronized (this) {
+ this.handlers.add(handler);
+ }
+ handler.start();
+ }
+
+ protected synchronized void cleanupSocksProxyHandler(final SocksProxyHandler handler) {
+ this.handlers.remove(handler);
+ }
+
+ public SocketAddress getProxyAddress() {
+ return this.server.getLocalSocketAddress();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
index 3661a80..bb23dee 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
@@ -164,10 +164,14 @@ public class Http1IntegrationTest extends InternalHttp1ServerTestBase {
public void setup() throws Exception {
log.debug("Starting up test client");
client = new Http1TestClient(
- IOReactorConfig.DEFAULT,
+ buildReactorConfig(),
scheme == URIScheme.HTTPS ? SSLTestContexts.createClientSSLContext() : null);
}
+ protected IOReactorConfig buildReactorConfig() {
+ return IOReactorConfig.DEFAULT;
+ }
+
@After
public void cleanup() throws Exception {
log.debug("Shutting down test client");
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1SocksProxyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1SocksProxyIntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1SocksProxyIntegrationTest.java
new file mode 100644
index 0000000..be06284
--- /dev/null
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1SocksProxyIntegrationTest.java
@@ -0,0 +1,67 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.nio;
+
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.testing.SocksProxy;
+import org.apache.hc.core5.util.TimeValue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class Http1SocksProxyIntegrationTest extends Http1IntegrationTest {
+
+ protected static SocksProxy PROXY;
+
+ @BeforeClass
+ public static void before() throws Throwable {
+ PROXY = new SocksProxy();
+ PROXY.start();
+ }
+
+ @AfterClass
+ public static void after() {
+ if (PROXY != null) {
+ try {
+ PROXY.shutdown(TimeValue.ofSeconds(5));
+ } catch (final Exception ignore) {
+ }
+ PROXY = null;
+ }
+ }
+
+ public Http1SocksProxyIntegrationTest(final URIScheme scheme) {
+ super(scheme);
+ }
+
+ @Override
+ protected IOReactorConfig buildReactorConfig() {
+ return IOReactorConfig.custom().setSocksProxyAddress(PROXY.getProxyAddress()).build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
index 30a3998..a6d4b42 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
@@ -155,10 +155,14 @@ public class Http2IntegrationTest extends InternalHttp2ServerTestBase {
@Before
public void setup() throws Exception {
log.debug("Starting up test client");
- client = new Http2TestClient(IOReactorConfig.DEFAULT,
+ client = new Http2TestClient(buildReactorConfig(),
scheme == URIScheme.HTTPS ? SSLTestContexts.createClientSSLContext() : null);
}
+ protected IOReactorConfig buildReactorConfig() {
+ return IOReactorConfig.DEFAULT;
+ }
+
@After
public void cleanup() throws Exception {
log.debug("Shutting down test client");
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2SocksProxyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2SocksProxyIntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2SocksProxyIntegrationTest.java
new file mode 100644
index 0000000..9a958e2
--- /dev/null
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2SocksProxyIntegrationTest.java
@@ -0,0 +1,67 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.nio;
+
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.testing.SocksProxy;
+import org.apache.hc.core5.util.TimeValue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class Http2SocksProxyIntegrationTest extends Http2IntegrationTest {
+
+ protected static SocksProxy PROXY;
+
+ @BeforeClass
+ public static void before() throws Throwable {
+ PROXY = new SocksProxy();
+ PROXY.start();
+ }
+
+ @AfterClass
+ public static void after() {
+ if (PROXY != null) {
+ try {
+ PROXY.shutdown(TimeValue.ofSeconds(5));
+ } catch (final Exception ignore) {
+ }
+ PROXY = null;
+ }
+ }
+
+ public Http2SocksProxyIntegrationTest(final URIScheme scheme) {
+ super(scheme);
+ }
+
+ @Override
+ protected IOReactorConfig buildReactorConfig() {
+ return IOReactorConfig.custom().setSocksProxyAddress(PROXY.getProxyAddress()).build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java
index e678843..1b0c5d7 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java
@@ -27,6 +27,7 @@
package org.apache.hc.core5.reactor;
+import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.hc.core5.annotation.Contract;
@@ -55,6 +56,9 @@ public final class IOReactorConfig {
private final int sndBufSize;
private final int rcvBufSize;
private final int backlogSize;
+ private final SocketAddress socksProxyAddress;
+ private final String socksProxyUsername;
+ private final String socksProxyPassword;
IOReactorConfig(
final TimeValue selectInterval,
@@ -66,7 +70,10 @@ public final class IOReactorConfig {
final boolean tcpNoDelay,
final int sndBufSize,
final int rcvBufSize,
- final int backlogSize) {
+ final int backlogSize,
+ final SocketAddress socksProxyAddress,
+ final String socksProxyUsername,
+ final String socksProxyPassword) {
super();
this.selectInterval = selectInterval;
this.ioThreadCount = ioThreadCount;
@@ -78,6 +85,9 @@ public final class IOReactorConfig {
this.sndBufSize = sndBufSize;
this.rcvBufSize = rcvBufSize;
this.backlogSize = backlogSize;
+ this.socksProxyAddress = socksProxyAddress;
+ this.socksProxyUsername = socksProxyUsername;
+ this.socksProxyPassword = socksProxyPassword;
}
/**
@@ -203,6 +213,27 @@ public final class IOReactorConfig {
return backlogSize;
}
+ /**
+ * The address of the SOCKS proxy to use.
+ */
+ public SocketAddress getSocksProxyAddress() {
+ return this.socksProxyAddress;
+ }
+
+ /**
+ * The username to provide to the SOCKS proxy for username/password authentication.
+ */
+ public String getSocksProxyUsername() {
+ return this.socksProxyUsername;
+ }
+
+ /**
+ * The password to provide to the SOCKS proxy for username/password authentication.
+ */
+ public String getSocksProxyPassword() {
+ return this.socksProxyPassword;
+ }
+
public static Builder custom() {
return new Builder();
}
@@ -219,7 +250,10 @@ public final class IOReactorConfig {
.setTcpNoDelay(config.isTcpNoDelay())
.setSndBufSize(config.getSndBufSize())
.setRcvBufSize(config.getRcvBufSize())
- .setBacklogSize(config.getBacklogSize());
+ .setBacklogSize(config.getBacklogSize())
+ .setSocksProxyAddress(config.getSocksProxyAddress())
+ .setSocksProxyUsername(config.getSocksProxyUsername())
+ .setSocksProxyPassword(config.getSocksProxyPassword());
}
public static class Builder {
@@ -261,6 +295,9 @@ public final class IOReactorConfig {
private int sndBufSize;
private int rcvBufSize;
private int backlogSize;
+ private SocketAddress socksProxyAddress;
+ private String socksProxyUsername;
+ private String socksProxyPassword;
Builder() {
this.selectInterval = TimeValue.ofSeconds(1);
@@ -273,6 +310,9 @@ public final class IOReactorConfig {
this.sndBufSize = 0;
this.rcvBufSize = 0;
this.backlogSize = 0;
+ this.socksProxyAddress = null;
+ this.socksProxyUsername = null;
+ this.socksProxyPassword = null;
}
public Builder setSelectInterval(final TimeValue selectInterval) {
@@ -335,6 +375,21 @@ public final class IOReactorConfig {
return this;
}
+ public Builder setSocksProxyAddress(final SocketAddress socksProxyAddress) {
+ this.socksProxyAddress = socksProxyAddress;
+ return this;
+ }
+
+ public Builder setSocksProxyUsername(final String socksProxyUsername) {
+ this.socksProxyUsername = socksProxyUsername;
+ return this;
+ }
+
+ public Builder setSocksProxyPassword(final String socksProxyPassword) {
+ this.socksProxyPassword = socksProxyPassword;
+ return this;
+ }
+
public IOReactorConfig build() {
return new IOReactorConfig(
selectInterval != null ? selectInterval : TimeValue.ofSeconds(1),
@@ -344,7 +399,8 @@ public final class IOReactorConfig {
TimeValue.defaultsToNegativeOneMillisecond(soLinger),
soKeepAlive,
tcpNoDelay,
- sndBufSize, rcvBufSize, backlogSize);
+ sndBufSize, rcvBufSize, backlogSize,
+ socksProxyAddress, socksProxyUsername, socksProxyPassword);
}
}
@@ -362,6 +418,7 @@ public final class IOReactorConfig {
.append(", sndBufSize=").append(this.sndBufSize)
.append(", rcvBufSize=").append(this.rcvBufSize)
.append(", backlogSize=").append(this.backlogSize)
+ .append(", socksProxyAddress=").append(this.socksProxyAddress)
.append("]");
return builder.toString();
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
index 65a51a3..412ce30 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
@@ -313,7 +313,21 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect
sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
sock.bind(sessionRequest.localAddress);
}
- final boolean connected = socketChannel.connect(sessionRequest.remoteAddress);
+
+ final SocketAddress targetAddress;
+ final IOEventHandlerFactory eventHandlerFactory;
+ if (this.reactorConfig.getSocksProxyAddress() != null) {
+ targetAddress = this.reactorConfig.getSocksProxyAddress();
+ eventHandlerFactory = new SocksProxyProtocolHandlerFactory(
+ sessionRequest.remoteAddress,
+ this.reactorConfig.getSocksProxyUsername(),
+ this.reactorConfig.getSocksProxyPassword(),
+ this.eventHandlerFactory);
+ } else {
+ targetAddress = sessionRequest.remoteAddress;
+ eventHandlerFactory = this.eventHandlerFactory;
+ }
+ final boolean connected = socketChannel.connect(targetAddress);
final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
final InternalChannel channel = new InternalConnectChannel(key, socketChannel, sessionRequest, new InternalDataChannelFactory() {
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandler.java
new file mode 100644
index 0000000..7d791fd
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandler.java
@@ -0,0 +1,343 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.UnresolvedAddressException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hc.core5.http.ConnectionClosedException;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Implements the client side of SOCKS protocol version 5 as per https://tools.ietf.org/html/rfc1928. Supports SOCKS username/password
+ * authentication as per https://tools.ietf.org/html/rfc1929.
+ */
+final class SocksProxyProtocolHandler implements IOEventHandler {
+
+ private static final int MAX_COMMAND_CONNECT_LENGTH = 22;
+
+ private static final byte CLIENT_VERSION = 5;
+
+ private static final byte NO_AUTHENTICATION_REQUIRED = 0;
+
+ private static final byte USERNAME_PASSWORD = 2;
+
+ private static final byte USERNAME_PASSWORD_VERSION = 1;
+
+ private static final byte SUCCESS = 0;
+
+ private static final byte COMMAND_CONNECT = 1;
+
+ private static final byte ATYP_IPV4 = 1;
+
+ private static final byte ATYP_DOMAINNAME = 3;
+
+ private static final byte ATYP_IPV6 = 4;
+
+ private static enum State {
+ SEND_AUTH, RECEIVE_AUTH_METHOD, SEND_USERNAME_PASSWORD, RECEIVE_AUTH, SEND_CONNECT, RECEIVE_RESPONSE_CODE, RECEIVE_ADDRESS_TYPE, RECEIVE_ADDRESS, COMPLETE
+ }
+
+ private final ProtocolIOSession ioSession;
+ private final Object attachment;
+ private final InetSocketAddress targetAddress;
+ private final String username;
+ private final String password;
+ private final IOEventHandlerFactory eventHandlerFactory;
+
+ // a 32 byte buffer is enough for all usual SOCKS negotiations, we expand it if necessary during the processing
+ private ByteBuffer buffer = ByteBuffer.allocate(32);
+ private State state = State.SEND_AUTH;
+ private int remainingResponseSize = -1;
+
+ SocksProxyProtocolHandler(final ProtocolIOSession ioSession, final Object attachment, final InetSocketAddress targetAddress,
+ final String username, final String password, final IOEventHandlerFactory eventHandlerFactory) {
+ this.ioSession = ioSession;
+ this.attachment = attachment;
+ this.targetAddress = targetAddress;
+ this.username = username;
+ this.password = password;
+ this.eventHandlerFactory = eventHandlerFactory;
+ }
+
+ @Override
+ public void connected(final IOSession session) throws IOException {
+ this.buffer.put(CLIENT_VERSION);
+ this.buffer.put((byte) 1);
+ this.buffer.put(NO_AUTHENTICATION_REQUIRED);
+ this.buffer.flip();
+ session.setEventMask(SelectionKey.OP_WRITE);
+ }
+
+ @Override
+ public void outputReady(final IOSession session) throws IOException {
+ switch (this.state) {
+ case SEND_AUTH:
+ if (writeAndPrepareRead(session.channel(), 2)) {
+ session.setEventMask(SelectionKey.OP_READ);
+ this.state = State.RECEIVE_AUTH_METHOD;
+ }
+ break;
+ case SEND_USERNAME_PASSWORD:
+ if (writeAndPrepareRead(session.channel(), 2)) {
+ session.setEventMask(SelectionKey.OP_READ);
+ this.state = State.RECEIVE_AUTH;
+ }
+ break;
+ case SEND_CONNECT:
+ if (writeAndPrepareRead(session.channel(), 2)) {
+ session.setEventMask(SelectionKey.OP_READ);
+ this.state = State.RECEIVE_RESPONSE_CODE;
+ }
+ break;
+ case RECEIVE_AUTH_METHOD:
+ case RECEIVE_AUTH:
+ case RECEIVE_ADDRESS:
+ case RECEIVE_ADDRESS_TYPE:
+ case RECEIVE_RESPONSE_CODE:
+ session.setEventMask(SelectionKey.OP_READ);
+ break;
+ case COMPLETE:
+ break;
+ }
+ }
+
+ @Override
+ public void inputReady(final IOSession session) throws IOException {
+ switch (this.state) {
+ case RECEIVE_AUTH_METHOD:
+ if (fillBuffer(session.channel())) {
+ this.buffer.flip();
+ final byte serverVersion = this.buffer.get();
+ final byte serverMethod = this.buffer.get();
+ if (serverVersion != CLIENT_VERSION) {
+ throw new IOException("SOCKS server returned unsupported version: " + serverVersion);
+ }
+ if (serverMethod == USERNAME_PASSWORD) {
+ this.buffer.clear();
+ setBufferLimit(this.username.length() + this.password.length() + 3);
+ this.buffer.put(USERNAME_PASSWORD_VERSION);
+ this.buffer.put((byte) this.username.length());
+ this.buffer.put(this.username.getBytes(StandardCharsets.ISO_8859_1));
+ this.buffer.put((byte) this.password.length());
+ this.buffer.put(this.password.getBytes(StandardCharsets.ISO_8859_1));
+ session.setEventMask(SelectionKey.OP_WRITE);
+ this.state = State.SEND_USERNAME_PASSWORD;
+ } else if (serverMethod == NO_AUTHENTICATION_REQUIRED) {
+ prepareConnectCommand();
+ session.setEventMask(SelectionKey.OP_WRITE);
+ this.state = State.SEND_CONNECT;
+ } else {
+ throw new IOException("SOCKS server return unsupported authentication method: " + serverMethod);
+ }
+ }
+ break;
+ case RECEIVE_AUTH:
+ if (fillBuffer(session.channel())) {
+ this.buffer.flip();
+ this.buffer.get(); // skip server auth version
+ final byte status = this.buffer.get();
+ if (status != SUCCESS) {
+ throw new IOException("Authentication failed for external SOCKS proxy");
+ }
+ prepareConnectCommand();
+ session.setEventMask(SelectionKey.OP_WRITE);
+ this.state = State.SEND_CONNECT;
+ }
+ break;
+ case RECEIVE_RESPONSE_CODE:
+ if (fillBuffer(session.channel())) {
+ this.buffer.flip();
+ final byte serverVersion = this.buffer.get();
+ final byte responseCode = this.buffer.get();
+ if (serverVersion != CLIENT_VERSION) {
+ throw new IOException("SOCKS server returned unsupported version: " + serverVersion);
+ }
+ if (responseCode != SUCCESS) {
+ throw new IOException("SOCKS server was unable to establish connection returned error code: " + responseCode);
+ }
+ this.buffer.compact();
+ this.buffer.limit(3);
+ this.state = State.RECEIVE_ADDRESS_TYPE;
+ // deliberate fall-through
+ } else {
+ break;
+ }
+ case RECEIVE_ADDRESS_TYPE:
+ if (fillBuffer(session.channel())) {
+ this.buffer.flip();
+ this.buffer.get(); // reserved byte that has no purpose
+ final byte aType = this.buffer.get();
+ final int addressSize;
+ if (aType == ATYP_IPV4) {
+ addressSize = 4;
+ } else if (aType == ATYP_IPV6) {
+ addressSize = 16;
+ } else if (aType == ATYP_DOMAINNAME) {
+ // mask with 0xFF to convert to unsigned byte value
+ addressSize = this.buffer.get() & 0xFF;
+ } else {
+ throw new IOException("SOCKS server returned unsupported address type: " + aType);
+ }
+ this.remainingResponseSize = addressSize + 2;
+ this.buffer.compact();
+ // make sure we only read what we need to, don't read too much
+ this.buffer.limit(this.remainingResponseSize);
+ this.state = State.RECEIVE_ADDRESS;
+ // deliberate fall-through
+ } else {
+ break;
+ }
+ case RECEIVE_ADDRESS:
+ if (fillBuffer(session.channel())) {
+ this.buffer.clear();
+ this.state = State.COMPLETE;
+ final IOEventHandler newHandler = this.eventHandlerFactory.createHandler(this.ioSession, this.attachment);
+ this.ioSession.upgrade(newHandler);
+ newHandler.connected(this.ioSession);
+ }
+ break;
+ case SEND_AUTH:
+ case SEND_USERNAME_PASSWORD:
+ case SEND_CONNECT:
+ session.setEventMask(SelectionKey.OP_WRITE);
+ break;
+ case COMPLETE:
+ break;
+ }
+ }
+
+ private void prepareConnectCommand() throws IOException {
+ final InetAddress address = this.targetAddress.getAddress();
+ final int port = this.targetAddress.getPort();
+ if (address == null || port == 0) {
+ throw new UnresolvedAddressException();
+ }
+
+ this.buffer.clear();
+ setBufferLimit(MAX_COMMAND_CONNECT_LENGTH);
+ this.buffer.put(CLIENT_VERSION);
+ this.buffer.put(COMMAND_CONNECT);
+ this.buffer.put((byte) 0); // reserved
+ if (address instanceof Inet4Address) {
+ this.buffer.put(ATYP_IPV4);
+ this.buffer.put(address.getAddress());
+ } else if (address instanceof Inet6Address) {
+ this.buffer.put(ATYP_IPV6);
+ this.buffer.put(address.getAddress());
+ } else {
+ throw new IOException("Unsupported remote address class: " + address.getClass().getName());
+ }
+ this.buffer.putShort((short) port);
+ this.buffer.flip();
+ }
+
+ private void setBufferLimit(final int newLimit) {
+ if (this.buffer.capacity() < newLimit) {
+ final ByteBuffer newBuffer = ByteBuffer.allocate(newLimit);
+ this.buffer.flip();
+ newBuffer.put(this.buffer);
+ this.buffer = newBuffer;
+ } else {
+ this.buffer.limit(newLimit);
+ }
+ }
+
+ private boolean writeAndPrepareRead(final ByteChannel channel, final int readSize) throws IOException {
+ if (writeBuffer(channel)) {
+ this.buffer.clear();
+ setBufferLimit(readSize);
+ return true;
+ }
+ return false;
+ }
+
+ private boolean writeBuffer(final ByteChannel channel) throws IOException {
+ if (this.buffer.hasRemaining()) {
+ channel.write(this.buffer);
+ }
+ return !this.buffer.hasRemaining();
+ }
+
+ private boolean fillBuffer(final ByteChannel channel) throws IOException {
+ if (this.buffer.hasRemaining()) {
+ channel.read(this.buffer);
+ }
+ return !this.buffer.hasRemaining();
+ }
+
+ @Override
+ public void timeout(final IOSession session, final Timeout timeout) throws IOException {
+ exception(session, SocketTimeoutExceptionFactory.create(timeout));
+ }
+
+ @Override
+ public void exception(final IOSession session, final Exception cause) {
+ try {
+ cleanupRequests(cause);
+ } finally {
+ session.close(CloseMode.IMMEDIATE);
+ }
+ }
+
+ @Override
+ public void disconnected(final IOSession session) {
+ cleanupRequests(new ConnectionClosedException());
+ }
+
+ private void cleanupRequests(final Exception cause) {
+ while (true) {
+ final Command command = ioSession.poll();
+ if (command != null) {
+ if (command instanceof RequestExecutionCommand) {
+ final RequestExecutionCommand executionCommand = (RequestExecutionCommand) command;
+ final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
+ exchangeHandler.failed(cause);
+ exchangeHandler.releaseResources();
+ } else {
+ command.cancel();
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/42d992fe/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandlerFactory.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandlerFactory.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandlerFactory.java
new file mode 100644
index 0000000..d2fca8e
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SocksProxyProtocolHandlerFactory.java
@@ -0,0 +1,57 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+public class SocksProxyProtocolHandlerFactory implements IOEventHandlerFactory {
+
+ private final InetSocketAddress targetAddress;
+ private final String username;
+ private final String password;
+ private final IOEventHandlerFactory eventHandlerFactory;
+
+ public SocksProxyProtocolHandlerFactory(final SocketAddress targetAddress, final String username, final String password, final IOEventHandlerFactory eventHandlerFactory) throws IOException {
+ this.eventHandlerFactory = eventHandlerFactory;
+ this.username = username;
+ this.password = password;
+ if (targetAddress instanceof InetSocketAddress) {
+ this.targetAddress = (InetSocketAddress) targetAddress;
+ } else {
+ throw new IOException("Unsupported target address type for SOCKS proxy connection: " + targetAddress.getClass());
+ }
+ }
+
+ @Override
+ public IOEventHandler createHandler(final ProtocolIOSession ioSession, final Object attachment) {
+ return new SocksProxyProtocolHandler(ioSession, attachment, this.targetAddress, this.username, this.password, this.eventHandlerFactory);
+ }
+
+}