You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2020/03/07 22:04:37 UTC

[activemq-artemis] 01/03: ARTEMIS-1194 Add SOCKS support

This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit f1474ce7c8cecfa004d35552dabfe2d12c045896
Author: Andrius Dagys <an...@r3.com>
AuthorDate: Wed May 24 17:16:37 2017 +0100

    ARTEMIS-1194 Add SOCKS support
    
    Add a Netty socks proxy handler during channel initialisation to allow
    Artemis to communicate via a SOCKS proxy. Supports SOCKS version 4a & 5.
    Even if enabled in configuration, the proxy will not be used when the
    target host is a loopback address.
---
 .../core/remoting/impl/netty/NettyConnector.java   | 58 ++++++++++++++
 .../remoting/impl/netty/TransportConstants.java    | 32 +++++++-
 artemis-features/src/main/resources/features.xml   |  3 +
 .../remoting/impl/netty/NettyConnectorTest.java    | 90 ++++++++++++++++++++++
 4 files changed, 182 insertions(+), 1 deletion(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index ecccf00..682cb8a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -93,6 +93,10 @@ import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
 import io.netty.handler.codec.http.cookie.Cookie;
 import io.netty.handler.ssl.SslContext;
+import io.netty.handler.codec.socksx.SocksVersion;
+import io.netty.handler.proxy.ProxyHandler;
+import io.netty.handler.proxy.Socks4ProxyHandler;
+import io.netty.handler.proxy.Socks5ProxyHandler;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.AttributeKey;
 import io.netty.util.ResourceLeakDetector;
@@ -192,6 +196,18 @@ public class NettyConnector extends AbstractConnector {
    // will be handled by the server's http server.
    private boolean httpUpgradeEnabled;
 
+   private boolean proxyEnabled;
+
+   private String proxyHost;
+
+   private int proxyPort;
+
+   private SocksVersion proxyVersion;
+
+   private String proxyUsername;
+
+   private String proxyPassword;
+
    private boolean useServlet;
 
    private String host;
@@ -326,6 +342,18 @@ public class NettyConnector extends AbstractConnector {
 
       httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED, configuration);
 
+      proxyEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.PROXY_ENABLED_PROP_NAME, TransportConstants.DEFAULT_PROXY_ENABLED, configuration);
+      if (proxyEnabled) {
+         proxyHost = ConfigurationHelper.getStringProperty(TransportConstants.PROXY_HOST_PROP_NAME, TransportConstants.DEFAULT_PROXY_HOST, configuration);
+         proxyPort = ConfigurationHelper.getIntProperty(TransportConstants.PROXY_PORT_PROP_NAME, TransportConstants.DEFAULT_PROXY_PORT, configuration);
+
+         int socksVersionNumber = ConfigurationHelper.getIntProperty(TransportConstants.PROXY_VERSION_PROP_NAME, TransportConstants.DEFAULT_PROXY_VERSION, configuration);
+         proxyVersion = SocksVersion.valueOf((byte) socksVersionNumber);
+
+         proxyUsername = ConfigurationHelper.getStringProperty(TransportConstants.PROXY_USERNAME_PROP_NAME, TransportConstants.DEFAULT_PROXY_USERNAME, configuration);
+         proxyPassword = ConfigurationHelper.getStringProperty(TransportConstants.PROXY_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_PROXY_PASSWORD, configuration);
+      }
+
       remotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, -1, configuration);
       remotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.REMOTING_THREADS_PROPNAME, remotingThreads, configuration);
 
@@ -533,6 +561,26 @@ public class NettyConnector extends AbstractConnector {
          @Override
          public void initChannel(Channel channel) throws Exception {
             final ChannelPipeline pipeline = channel.pipeline();
+
+            if (proxyEnabled && !isTargetLocalHost()) {
+               InetSocketAddress proxyAddress = new InetSocketAddress(proxyHost, proxyPort);
+               ProxyHandler proxyHandler;
+               switch (proxyVersion) {
+                  case SOCKS5:
+                     proxyHandler = new Socks5ProxyHandler(proxyAddress, proxyUsername, proxyPassword);
+                     break;
+                  case SOCKS4a:
+                     proxyHandler = new Socks4ProxyHandler(proxyAddress, proxyUsername);
+                     break;
+                  default:
+                     throw new IllegalArgumentException("Unknown SOCKS proxy version");
+               }
+
+               channel.pipeline().addLast(proxyHandler);
+
+               logger.debug("Using a SOCKS proxy at " + proxyHost + ":" + proxyPort);
+            }
+
             if (sslEnabled && !useServlet) {
 
                SSLEngine engine;
@@ -1228,6 +1276,16 @@ public class NettyConnector extends AbstractConnector {
       return result;
    }
 
+   private boolean isTargetLocalHost() {
+      try {
+         InetAddress address = InetAddress.getByName(host);
+         return address.isLoopbackAddress();
+      } catch (UnknownHostException e) {
+         ActiveMQClientLogger.LOGGER.error("Cannot resolve host", e);
+      }
+      return false;
+   }
+
    @Override
    public void finalize() throws Throwable {
       close();
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index b1cd541..5b715dc 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -20,6 +20,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
+import io.netty.handler.codec.socksx.SocksVersion;
 import io.netty.util.Version;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.jboss.logging.Logger;
@@ -164,6 +165,18 @@ public class TransportConstants {
 
    public static final int STOMP_DEFAULT_CONSUMERS_CREDIT = 10 * 1024; // 10K
 
+   public static final String PROXY_ENABLED_PROP_NAME = "socksEnabled";
+
+   public static final String PROXY_HOST_PROP_NAME = "socksHost";
+
+   public static final String PROXY_PORT_PROP_NAME = "socksPort";
+
+   public static final String PROXY_VERSION_PROP_NAME = "socksVersion";
+
+   public static final String PROXY_USERNAME_PROP_NAME = "socksUsername";
+
+   public static final String PROXY_PASSWORD_PROP_NAME = "socksPassword";
+
    public static final boolean DEFAULT_SSL_ENABLED = false;
 
    public static final String DEFAULT_SSL_KRB5_CONFIG = null;
@@ -311,6 +324,18 @@ public class TransportConstants {
     *  When running on a test suite, we need it to be 0, You should see a property on the main pom.xml */
    public static final int DEFAULT_SHUTDOWN_TIMEOUT = parseDefaultVariable("DEFAULT_SHUTDOWN_TIMEOUT", 3_000);
 
+   public static final boolean DEFAULT_PROXY_ENABLED = false;
+
+   public static final String DEFAULT_PROXY_HOST = null;
+
+   public static final int DEFAULT_PROXY_PORT = 0;
+
+   public static final byte DEFAULT_PROXY_VERSION = SocksVersion.SOCKS5.byteValue();
+
+   public static final String DEFAULT_PROXY_USERNAME = null;
+
+   public static final String DEFAULT_PROXY_PASSWORD = null;
+
    private static int parseDefaultVariable(String variableName, int defaultValue) {
       try {
          String variable = System.getProperty(TransportConstants.class.getName() + "." + variableName);
@@ -324,7 +349,6 @@ public class TransportConstants {
       return defaultValue;
    }
 
-
    static {
       Set<String> allowableAcceptorKeys = new HashSet<>();
       allowableAcceptorKeys.add(TransportConstants.SSL_ENABLED_PROP_NAME);
@@ -424,6 +448,12 @@ public class TransportConstants {
       allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
       allowableConnectorKeys.add(TransportConstants.REMOTING_THREADS_PROPNAME);
       allowableConnectorKeys.add(TransportConstants.BATCH_DELAY);
+      allowableConnectorKeys.add(TransportConstants.PROXY_ENABLED_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.PROXY_HOST_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.PROXY_PORT_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.PROXY_VERSION_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.PROXY_USERNAME_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.PROXY_PASSWORD_PROP_NAME);
       allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
       allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());
       allowableConnectorKeys.add(TransportConstants.NETTY_CONNECT_TIMEOUT);
diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml
index eb79e75..24b06e9 100644
--- a/artemis-features/src/main/resources/features.xml
+++ b/artemis-features/src/main/resources/features.xml
@@ -34,7 +34,10 @@
 		<bundle>mvn:io.netty/netty-transport/${netty.version}</bundle>
 		<bundle>mvn:io.netty/netty-buffer/${netty.version}</bundle>
 		<bundle>mvn:io.netty/netty-codec/${netty.version}</bundle>
+		<bundle>mvn:io.netty/netty-codec-socks/${netty.version}</bundle>
+		<bundle>mvn:io.netty/netty-codec-http/${netty.version}</bundle>
 		<bundle>mvn:io.netty/netty-handler/${netty.version}</bundle>
+		<bundle>mvn:io.netty/netty-handler-proxy/${netty.version}</bundle>
 		<bundle>mvn:io.netty/netty-transport-native-epoll/${netty.version}</bundle>
 		<bundle>mvn:io.netty/netty-transport-native-kqueue/${netty.version}</bundle>
 		<bundle>mvn:io.netty/netty-transport-native-unix-common/${netty.version}</bundle>
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
index ffd2cd4..d31f722 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
 
+import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -23,6 +24,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import io.netty.channel.ChannelPipeline;
+import io.netty.handler.proxy.Socks5ProxyHandler;
+
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -394,4 +397,91 @@ public class NettyConnectorTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testSocksProxyHandlerAdded() throws Exception {
+      BufferHandler handler = new BufferHandler() {
+         @Override
+         public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
+         }
+      };
+      Map<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.HOST_PROP_NAME, InetAddress.getLocalHost().getHostAddress());
+      params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true);
+      params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost");
+
+      ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
+         @Override
+         public void connectionException(final Object connectionID, final ActiveMQException me) {
+         }
+
+         @Override
+         public void connectionDestroyed(final Object connectionID) {
+         }
+
+         @Override
+         public void connectionCreated(final ActiveMQComponent component,
+                                       final Connection connection,
+                                       final ClientProtocolManager protocol) {
+         }
+
+         @Override
+         public void connectionReadyForWrites(Object connectionID, boolean ready) {
+         }
+      };
+
+      NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
+
+      connector.start();
+      Assert.assertTrue(connector.isStarted());
+
+      ChannelPipeline pipeline = connector.getBootStrap().register().await().channel().pipeline();
+      Assert.assertNotNull(pipeline.get(Socks5ProxyHandler.class));
+
+      connector.close();
+      Assert.assertFalse(connector.isStarted());
+   }
+
+   @Test
+   public void testSocksProxyHandlerNotAddedForLocalhost() throws Exception {
+      BufferHandler handler = new BufferHandler() {
+         @Override
+         public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
+         }
+      };
+      Map<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+      params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true);
+      params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost");
+
+      ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
+         @Override
+         public void connectionException(final Object connectionID, final ActiveMQException me) {
+         }
+
+         @Override
+         public void connectionDestroyed(final Object connectionID) {
+         }
+
+         @Override
+         public void connectionCreated(final ActiveMQComponent component,
+                                       final Connection connection,
+                                       final ClientProtocolManager protocol) {
+         }
+
+         @Override
+         public void connectionReadyForWrites(Object connectionID, boolean ready) {
+         }
+      };
+
+      NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
+
+      connector.start();
+      Assert.assertTrue(connector.isStarted());
+
+      ChannelPipeline pipeline = connector.getBootStrap().register().await().channel().pipeline();
+      Assert.assertNull(pipeline.get(Socks5ProxyHandler.class));
+
+      connector.close();
+      Assert.assertFalse(connector.isStarted());
+   }
 }