You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2020/03/07 22:04:37 UTC
[activemq-artemis] 01/03: ARTEMIS-1194 Add SOCKS support
This is an automated email from the ASF dual-hosted git repository.
michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit f1474ce7c8cecfa004d35552dabfe2d12c045896
Author: Andrius Dagys <an...@r3.com>
AuthorDate: Wed May 24 17:16:37 2017 +0100
ARTEMIS-1194 Add SOCKS support
Add a Netty socks proxy handler during channel initialisation to allow
Artemis to communicate via a SOCKS proxy. Supports SOCKS version 4a & 5.
Even if enabled in configuration, the proxy will not be used when the
target host is a loopback address.
---
.../core/remoting/impl/netty/NettyConnector.java | 58 ++++++++++++++
.../remoting/impl/netty/TransportConstants.java | 32 +++++++-
artemis-features/src/main/resources/features.xml | 3 +
.../remoting/impl/netty/NettyConnectorTest.java | 90 ++++++++++++++++++++++
4 files changed, 182 insertions(+), 1 deletion(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index ecccf00..682cb8a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -93,6 +93,10 @@ import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.ssl.SslContext;
+import io.netty.handler.codec.socksx.SocksVersion;
+import io.netty.handler.proxy.ProxyHandler;
+import io.netty.handler.proxy.Socks4ProxyHandler;
+import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.AttributeKey;
import io.netty.util.ResourceLeakDetector;
@@ -192,6 +196,18 @@ public class NettyConnector extends AbstractConnector {
// will be handled by the server's http server.
private boolean httpUpgradeEnabled;
+ private boolean proxyEnabled;
+
+ private String proxyHost;
+
+ private int proxyPort;
+
+ private SocksVersion proxyVersion;
+
+ private String proxyUsername;
+
+ private String proxyPassword;
+
private boolean useServlet;
private String host;
@@ -326,6 +342,18 @@ public class NettyConnector extends AbstractConnector {
httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED, configuration);
+ proxyEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.PROXY_ENABLED_PROP_NAME, TransportConstants.DEFAULT_PROXY_ENABLED, configuration);
+ if (proxyEnabled) {
+ proxyHost = ConfigurationHelper.getStringProperty(TransportConstants.PROXY_HOST_PROP_NAME, TransportConstants.DEFAULT_PROXY_HOST, configuration);
+ proxyPort = ConfigurationHelper.getIntProperty(TransportConstants.PROXY_PORT_PROP_NAME, TransportConstants.DEFAULT_PROXY_PORT, configuration);
+
+ int socksVersionNumber = ConfigurationHelper.getIntProperty(TransportConstants.PROXY_VERSION_PROP_NAME, TransportConstants.DEFAULT_PROXY_VERSION, configuration);
+ proxyVersion = SocksVersion.valueOf((byte) socksVersionNumber);
+
+ proxyUsername = ConfigurationHelper.getStringProperty(TransportConstants.PROXY_USERNAME_PROP_NAME, TransportConstants.DEFAULT_PROXY_USERNAME, configuration);
+ proxyPassword = ConfigurationHelper.getStringProperty(TransportConstants.PROXY_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_PROXY_PASSWORD, configuration);
+ }
+
remotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, -1, configuration);
remotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.REMOTING_THREADS_PROPNAME, remotingThreads, configuration);
@@ -533,6 +561,26 @@ public class NettyConnector extends AbstractConnector {
@Override
public void initChannel(Channel channel) throws Exception {
final ChannelPipeline pipeline = channel.pipeline();
+
+ if (proxyEnabled && !isTargetLocalHost()) {
+ InetSocketAddress proxyAddress = new InetSocketAddress(proxyHost, proxyPort);
+ ProxyHandler proxyHandler;
+ switch (proxyVersion) {
+ case SOCKS5:
+ proxyHandler = new Socks5ProxyHandler(proxyAddress, proxyUsername, proxyPassword);
+ break;
+ case SOCKS4a:
+ proxyHandler = new Socks4ProxyHandler(proxyAddress, proxyUsername);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown SOCKS proxy version");
+ }
+
+ channel.pipeline().addLast(proxyHandler);
+
+ logger.debug("Using a SOCKS proxy at " + proxyHost + ":" + proxyPort);
+ }
+
if (sslEnabled && !useServlet) {
SSLEngine engine;
@@ -1228,6 +1276,16 @@ public class NettyConnector extends AbstractConnector {
return result;
}
+ private boolean isTargetLocalHost() {
+ try {
+ InetAddress address = InetAddress.getByName(host);
+ return address.isLoopbackAddress();
+ } catch (UnknownHostException e) {
+ ActiveMQClientLogger.LOGGER.error("Cannot resolve host", e);
+ }
+ return false;
+ }
+
@Override
public void finalize() throws Throwable {
close();
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index b1cd541..5b715dc 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -20,6 +20,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import io.netty.handler.codec.socksx.SocksVersion;
import io.netty.util.Version;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.jboss.logging.Logger;
@@ -164,6 +165,18 @@ public class TransportConstants {
public static final int STOMP_DEFAULT_CONSUMERS_CREDIT = 10 * 1024; // 10K
+ public static final String PROXY_ENABLED_PROP_NAME = "socksEnabled";
+
+ public static final String PROXY_HOST_PROP_NAME = "socksHost";
+
+ public static final String PROXY_PORT_PROP_NAME = "socksPort";
+
+ public static final String PROXY_VERSION_PROP_NAME = "socksVersion";
+
+ public static final String PROXY_USERNAME_PROP_NAME = "socksUsername";
+
+ public static final String PROXY_PASSWORD_PROP_NAME = "socksPassword";
+
public static final boolean DEFAULT_SSL_ENABLED = false;
public static final String DEFAULT_SSL_KRB5_CONFIG = null;
@@ -311,6 +324,18 @@ public class TransportConstants {
* When running on a test suite, we need it to be 0, You should see a property on the main pom.xml */
public static final int DEFAULT_SHUTDOWN_TIMEOUT = parseDefaultVariable("DEFAULT_SHUTDOWN_TIMEOUT", 3_000);
+ public static final boolean DEFAULT_PROXY_ENABLED = false;
+
+ public static final String DEFAULT_PROXY_HOST = null;
+
+ public static final int DEFAULT_PROXY_PORT = 0;
+
+ public static final byte DEFAULT_PROXY_VERSION = SocksVersion.SOCKS5.byteValue();
+
+ public static final String DEFAULT_PROXY_USERNAME = null;
+
+ public static final String DEFAULT_PROXY_PASSWORD = null;
+
private static int parseDefaultVariable(String variableName, int defaultValue) {
try {
String variable = System.getProperty(TransportConstants.class.getName() + "." + variableName);
@@ -324,7 +349,6 @@ public class TransportConstants {
return defaultValue;
}
-
static {
Set<String> allowableAcceptorKeys = new HashSet<>();
allowableAcceptorKeys.add(TransportConstants.SSL_ENABLED_PROP_NAME);
@@ -424,6 +448,12 @@ public class TransportConstants {
allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
allowableConnectorKeys.add(TransportConstants.REMOTING_THREADS_PROPNAME);
allowableConnectorKeys.add(TransportConstants.BATCH_DELAY);
+ allowableConnectorKeys.add(TransportConstants.PROXY_ENABLED_PROP_NAME);
+ allowableConnectorKeys.add(TransportConstants.PROXY_HOST_PROP_NAME);
+ allowableConnectorKeys.add(TransportConstants.PROXY_PORT_PROP_NAME);
+ allowableConnectorKeys.add(TransportConstants.PROXY_VERSION_PROP_NAME);
+ allowableConnectorKeys.add(TransportConstants.PROXY_USERNAME_PROP_NAME);
+ allowableConnectorKeys.add(TransportConstants.PROXY_PASSWORD_PROP_NAME);
allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());
allowableConnectorKeys.add(TransportConstants.NETTY_CONNECT_TIMEOUT);
diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml
index eb79e75..24b06e9 100644
--- a/artemis-features/src/main/resources/features.xml
+++ b/artemis-features/src/main/resources/features.xml
@@ -34,7 +34,10 @@
<bundle>mvn:io.netty/netty-transport/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-buffer/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-codec/${netty.version}</bundle>
+ <bundle>mvn:io.netty/netty-codec-socks/${netty.version}</bundle>
+ <bundle>mvn:io.netty/netty-codec-http/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-handler/${netty.version}</bundle>
+ <bundle>mvn:io.netty/netty-handler-proxy/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-transport-native-epoll/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-transport-native-kqueue/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-transport-native-unix-common/${netty.version}</bundle>
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
index ffd2cd4..d31f722 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
+import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -23,6 +24,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import io.netty.channel.ChannelPipeline;
+import io.netty.handler.proxy.Socks5ProxyHandler;
+
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -394,4 +397,91 @@ public class NettyConnectorTest extends ActiveMQTestBase {
}
}
+ @Test
+ public void testSocksProxyHandlerAdded() throws Exception {
+ BufferHandler handler = new BufferHandler() {
+ @Override
+ public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
+ }
+ };
+ Map<String, Object> params = new HashMap<>();
+ params.put(TransportConstants.HOST_PROP_NAME, InetAddress.getLocalHost().getHostAddress());
+ params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true);
+ params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost");
+
+ ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
+ @Override
+ public void connectionException(final Object connectionID, final ActiveMQException me) {
+ }
+
+ @Override
+ public void connectionDestroyed(final Object connectionID) {
+ }
+
+ @Override
+ public void connectionCreated(final ActiveMQComponent component,
+ final Connection connection,
+ final ClientProtocolManager protocol) {
+ }
+
+ @Override
+ public void connectionReadyForWrites(Object connectionID, boolean ready) {
+ }
+ };
+
+ NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
+
+ connector.start();
+ Assert.assertTrue(connector.isStarted());
+
+ ChannelPipeline pipeline = connector.getBootStrap().register().await().channel().pipeline();
+ Assert.assertNotNull(pipeline.get(Socks5ProxyHandler.class));
+
+ connector.close();
+ Assert.assertFalse(connector.isStarted());
+ }
+
+ @Test
+ public void testSocksProxyHandlerNotAddedForLocalhost() throws Exception {
+ BufferHandler handler = new BufferHandler() {
+ @Override
+ public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
+ }
+ };
+ Map<String, Object> params = new HashMap<>();
+ params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+ params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true);
+ params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost");
+
+ ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
+ @Override
+ public void connectionException(final Object connectionID, final ActiveMQException me) {
+ }
+
+ @Override
+ public void connectionDestroyed(final Object connectionID) {
+ }
+
+ @Override
+ public void connectionCreated(final ActiveMQComponent component,
+ final Connection connection,
+ final ClientProtocolManager protocol) {
+ }
+
+ @Override
+ public void connectionReadyForWrites(Object connectionID, boolean ready) {
+ }
+ };
+
+ NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
+
+ connector.start();
+ Assert.assertTrue(connector.isStarted());
+
+ ChannelPipeline pipeline = connector.getBootStrap().register().await().channel().pipeline();
+ Assert.assertNull(pipeline.get(Socks5ProxyHandler.class));
+
+ connector.close();
+ Assert.assertFalse(connector.isStarted());
+ }
}