You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/07/21 20:05:01 UTC

[activemq-artemis] branch master updated: ARTEMIS-2847 socks5h support

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new e3ed8e8  ARTEMIS-2847 socks5h support
     new 46e622e  This closes #3221
e3ed8e8 is described below

commit e3ed8e815b515b123539d1be00a3d2b502f3a701
Author: Scott Werner <61...@users.noreply.github.com>
AuthorDate: Thu Jul 16 01:56:53 2020 -0500

    ARTEMIS-2847 socks5h support
    
    Added 'socksRemoteDNS' transport parameter.
    If set to true, remote destination socket is created unresolved
    and DNS resolution is disabled.
---
 .../core/remoting/impl/netty/NettyConnector.java   |  18 +-
 .../remoting/impl/netty/TransportConstants.java    |   5 +
 .../remoting/impl/netty/NettyConnectorTest.java    | 116 ---------
 .../core/remoting/impl/netty/SocksProxyTest.java   | 283 +++++++++++++++++++++
 4 files changed, 304 insertions(+), 118 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index e677e1d..4fb9066 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -98,6 +98,7 @@ import io.netty.handler.proxy.ProxyHandler;
 import io.netty.handler.proxy.Socks4ProxyHandler;
 import io.netty.handler.proxy.Socks5ProxyHandler;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.resolver.NoopAddressResolverGroup;
 import io.netty.util.AttributeKey;
 import io.netty.util.ResourceLeakDetector;
 import io.netty.util.ResourceLeakDetector.Level;
@@ -210,6 +211,8 @@ public class NettyConnector extends AbstractConnector {
 
    private String proxyPassword;
 
+   private boolean proxyRemoteDNS;
+
    private boolean useServlet;
 
    private String host;
@@ -354,6 +357,8 @@ public class NettyConnector extends AbstractConnector {
 
          proxyUsername = ConfigurationHelper.getStringProperty(TransportConstants.PROXY_USERNAME_PROP_NAME, TransportConstants.DEFAULT_PROXY_USERNAME, configuration);
          proxyPassword = ConfigurationHelper.getStringProperty(TransportConstants.PROXY_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_PROXY_PASSWORD, configuration);
+
+         proxyRemoteDNS = ConfigurationHelper.getBooleanProperty(TransportConstants.PROXY_REMOTE_DNS_PROP_NAME, TransportConstants.DEFAULT_PROXY_REMOTE_DNS, configuration);
       }
 
       remotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, -1, configuration);
@@ -564,7 +569,7 @@ public class NettyConnector extends AbstractConnector {
          public void initChannel(Channel channel) throws Exception {
             final ChannelPipeline pipeline = channel.pipeline();
 
-            if (proxyEnabled && !isTargetLocalHost()) {
+            if (proxyEnabled && (proxyRemoteDNS || !isTargetLocalHost())) {
                InetSocketAddress proxyAddress = new InetSocketAddress(proxyHost, proxyPort);
                ProxyHandler proxyHandler;
                switch (proxyVersion) {
@@ -581,6 +586,10 @@ public class NettyConnector extends AbstractConnector {
                channel.pipeline().addLast(proxyHandler);
 
                logger.debug("Using a SOCKS proxy at " + proxyHost + ":" + proxyPort);
+
+               if (proxyRemoteDNS) {
+                  bootstrap.resolver(NoopAddressResolverGroup.INSTANCE);
+               }
             }
 
             if (sslEnabled && !useServlet) {
@@ -800,7 +809,12 @@ public class NettyConnector extends AbstractConnector {
          return null;
       }
 
-      InetSocketAddress remoteDestination = new InetSocketAddress(IPV6Util.stripBracketsAndZoneID(host), port);
+      InetSocketAddress remoteDestination;
+      if (proxyEnabled && proxyRemoteDNS) {
+         remoteDestination = InetSocketAddress.createUnresolved(IPV6Util.stripBracketsAndZoneID(host), port);
+      } else {
+         remoteDestination = new InetSocketAddress(IPV6Util.stripBracketsAndZoneID(host), port);
+      }
 
       logger.debug("Remote destination: " + remoteDestination);
 
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index 728d6b9..018b2ba 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -181,6 +181,8 @@ public class TransportConstants {
 
    public static final String PROXY_PASSWORD_PROP_NAME = "socksPassword";
 
+   public static final String PROXY_REMOTE_DNS_PROP_NAME = "socksRemoteDNS";
+
    public static final boolean DEFAULT_SSL_ENABLED = false;
 
    public static final String DEFAULT_SSL_KRB5_CONFIG = null;
@@ -340,6 +342,8 @@ public class TransportConstants {
 
    public static final String DEFAULT_PROXY_PASSWORD = null;
 
+   public static final boolean DEFAULT_PROXY_REMOTE_DNS = false;
+
    private static int parseDefaultVariable(String variableName, int defaultValue) {
       try {
          String variable = System.getProperty(TransportConstants.class.getName() + "." + variableName);
@@ -461,6 +465,7 @@ public class TransportConstants {
       allowableConnectorKeys.add(TransportConstants.PROXY_VERSION_PROP_NAME);
       allowableConnectorKeys.add(TransportConstants.PROXY_USERNAME_PROP_NAME);
       allowableConnectorKeys.add(TransportConstants.PROXY_PASSWORD_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.PROXY_REMOTE_DNS_PROP_NAME);
       allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
       allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());
       allowableConnectorKeys.add(TransportConstants.NETTY_CONNECT_TIMEOUT);
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
index 67eed00..6f4a8a2 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
@@ -16,10 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
 
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -27,7 +23,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import io.netty.channel.ChannelPipeline;
-import io.netty.handler.proxy.Socks5ProxyHandler;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -44,7 +39,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.junit.Assert;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -399,114 +393,4 @@ public class NettyConnectorTest extends ActiveMQTestBase {
          scheduledThreadPool.shutdownNow();
       }
    }
-
-   @Test
-   public void testSocksProxyHandlerAdded() throws Exception {
-      InetAddress address = getNonLoopbackAddress();
-      Assume.assumeTrue("Cannot find non-loopback address", address != null);
-
-      BufferHandler handler = (connectionID, buffer) -> {
-      };
-      Map<String, Object> params = new HashMap<>();
-
-      params.put(TransportConstants.HOST_PROP_NAME, address.getHostAddress());
-      params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true);
-      params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost");
-
-      ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
-         @Override
-         public void connectionException(final Object connectionID, final ActiveMQException me) {
-         }
-
-         @Override
-         public void connectionDestroyed(final Object connectionID) {
-         }
-
-         @Override
-         public void connectionCreated(final ActiveMQComponent component,
-                                       final Connection connection,
-                                       final ClientProtocolManager protocol) {
-         }
-
-         @Override
-         public void connectionReadyForWrites(Object connectionID, boolean ready) {
-         }
-      };
-
-      NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
-
-      connector.start();
-      Assert.assertTrue(connector.isStarted());
-
-      ChannelPipeline pipeline = connector.getBootStrap().register().await().channel().pipeline();
-      Assert.assertNotNull(pipeline.get(Socks5ProxyHandler.class));
-
-      connector.close();
-      Assert.assertFalse(connector.isStarted());
-   }
-
-   private InetAddress getNonLoopbackAddress() throws SocketException {
-      Enumeration<NetworkInterface> n = NetworkInterface.getNetworkInterfaces();
-      InetAddress addr = null;
-      for (; n.hasMoreElements(); ) {
-         NetworkInterface e = n.nextElement();
-         Enumeration<InetAddress> a = e.getInetAddresses();
-         boolean found = false;
-         for (; a.hasMoreElements(); ) {
-            addr = a.nextElement();
-            if (!addr.isLoopbackAddress()) {
-               found = true;
-               break;
-            }
-         }
-         if (found) {
-            break;
-         }
-      }
-      return addr;
-   }
-
-   @Test
-   public void testSocksProxyHandlerNotAddedForLocalhost() throws Exception {
-      BufferHandler handler = new BufferHandler() {
-         @Override
-         public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
-         }
-      };
-      Map<String, Object> params = new HashMap<>();
-      params.put(TransportConstants.HOST_PROP_NAME, "localhost");
-      params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true);
-      params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost");
-
-      ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
-         @Override
-         public void connectionException(final Object connectionID, final ActiveMQException me) {
-         }
-
-         @Override
-         public void connectionDestroyed(final Object connectionID) {
-         }
-
-         @Override
-         public void connectionCreated(final ActiveMQComponent component,
-                                       final Connection connection,
-                                       final ClientProtocolManager protocol) {
-         }
-
-         @Override
-         public void connectionReadyForWrites(Object connectionID, boolean ready) {
-         }
-      };
-
-      NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
-
-      connector.start();
-      Assert.assertTrue(connector.isStarted());
-
-      ChannelPipeline pipeline = connector.getBootStrap().register().await().channel().pipeline();
-      Assert.assertNull(pipeline.get(Socks5ProxyHandler.class));
-
-      connector.close();
-      Assert.assertFalse(connector.isStarted());
-   }
 }
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java
new file mode 100644
index 0000000..a0ce33a
--- /dev/null
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.proxy.Socks5ProxyHandler;
+import io.netty.resolver.AddressResolverGroup;
+import io.netty.resolver.NoopAddressResolverGroup;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
+import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
+import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SocksProxyTest extends ActiveMQTestBase {
+
+   private static final int SOCKS_PORT = 1080;
+
+   private ExecutorService closeExecutor;
+   private ExecutorService threadPool;
+   private ScheduledExecutorService scheduledThreadPool;
+
+   private NioEventLoopGroup bossGroup;
+   private NioEventLoopGroup workerGroup;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      closeExecutor       = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
+      threadPool          = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
+      scheduledThreadPool = Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory());
+
+      startSocksProxy();
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      closeExecutor.shutdownNow();
+      threadPool.shutdownNow();
+      scheduledThreadPool.shutdownNow();
+
+      stopSocksProxy();
+
+      super.tearDown();
+   }
+
+   @Test
+   public void testSocksProxyHandlerAdded() throws Exception {
+      InetAddress address = getNonLoopbackAddress();
+      Assume.assumeTrue("Cannot find non-loopback address", address != null);
+
+      BufferHandler handler = (connectionID, buffer) -> {
+      };
+
+      Map<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.HOST_PROP_NAME, address.getHostAddress());
+      params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true);
+      params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost");
+
+      ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
+         @Override
+         public void connectionException(final Object connectionID, final ActiveMQException me) {
+         }
+
+         @Override
+         public void connectionDestroyed(final Object connectionID) {
+         }
+
+         @Override
+         public void connectionCreated(final ActiveMQComponent component,
+                                       final Connection connection,
+                                       final ClientProtocolManager protocol) {
+         }
+
+         @Override
+         public void connectionReadyForWrites(Object connectionID, boolean ready) {
+         }
+      };
+
+      NettyConnector connector = new NettyConnector(params, handler, listener, closeExecutor, threadPool, scheduledThreadPool);
+
+      connector.start();
+      Assert.assertTrue(connector.isStarted());
+
+      ChannelPipeline pipeline = connector.getBootStrap().register().await().channel().pipeline();
+      Assert.assertNotNull(pipeline.get(Socks5ProxyHandler.class));
+
+      connector.close();
+      Assert.assertFalse(connector.isStarted());
+   }
+
+   private InetAddress getNonLoopbackAddress() throws SocketException {
+      Enumeration<NetworkInterface> n = NetworkInterface.getNetworkInterfaces();
+      InetAddress addr = null;
+      for (; n.hasMoreElements(); ) {
+         NetworkInterface e = n.nextElement();
+         Enumeration<InetAddress> a = e.getInetAddresses();
+         boolean found = false;
+         for (; a.hasMoreElements(); ) {
+            addr = a.nextElement();
+            if (!addr.isLoopbackAddress()) {
+               found = true;
+               break;
+            }
+         }
+         if (found) {
+            break;
+         }
+      }
+      return addr;
+   }
+
+   @Test
+   public void testSocksProxyHandlerNotAddedForLocalhost() throws Exception {
+      BufferHandler handler = (connectionID, buffer) -> {
+      };
+
+      Map<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+      params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true);
+      params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost");
+
+      ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
+         @Override
+         public void connectionException(final Object connectionID, final ActiveMQException me) {
+         }
+
+         @Override
+         public void connectionDestroyed(final Object connectionID) {
+         }
+
+         @Override
+         public void connectionCreated(final ActiveMQComponent component,
+                                       final Connection connection,
+                                       final ClientProtocolManager protocol) {
+         }
+
+         @Override
+         public void connectionReadyForWrites(Object connectionID, boolean ready) {
+         }
+      };
+
+      NettyConnector connector = new NettyConnector(params, handler, listener, closeExecutor, threadPool, scheduledThreadPool);
+
+      connector.start();
+      Assert.assertTrue(connector.isStarted());
+
+      ChannelPipeline pipeline = connector.getBootStrap().register().await().channel().pipeline();
+      Assert.assertNull(pipeline.get(Socks5ProxyHandler.class));
+
+      connector.close();
+      Assert.assertFalse(connector.isStarted());
+   }
+
+   @Test
+   public void testSocks5hSupport() throws Exception {
+      BufferHandler handler = (connectionID, buffer) -> {
+      };
+      Map<String, Object> params = new HashMap<>();
+
+      params.put(TransportConstants.HOST_PROP_NAME, "only-resolvable-on-proxy");
+      params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true);
+      params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost");
+      params.put(TransportConstants.PROXY_PORT_PROP_NAME, SOCKS_PORT);
+      params.put(TransportConstants.PROXY_REMOTE_DNS_PROP_NAME, true);
+
+      ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
+         @Override
+         public void connectionException(final Object connectionID, final ActiveMQException me) {
+         }
+
+         @Override
+         public void connectionDestroyed(final Object connectionID) {
+         }
+
+         @Override
+         public void connectionCreated(final ActiveMQComponent component,
+                                       final Connection connection,
+                                       final ClientProtocolManager protocol) {
+         }
+
+         @Override
+         public void connectionReadyForWrites(Object connectionID, boolean ready) {
+         }
+      };
+
+      NettyConnector connector = new NettyConnector(params, handler, listener, closeExecutor, threadPool, scheduledThreadPool);
+
+      connector.start();
+      Assert.assertTrue(connector.isStarted());
+
+      connector.getBootStrap().register().await().channel().pipeline();
+
+      AddressResolverGroup<?> resolver = connector.getBootStrap().config().resolver();
+      Assert.assertSame(resolver, NoopAddressResolverGroup.INSTANCE);
+
+      Connection connection = connector.createConnection(future -> {
+         future.awaitUninterruptibly();
+         Assert.assertTrue(future.isSuccess());
+
+         Socks5ProxyHandler socks5Handler = future.channel().pipeline().get(Socks5ProxyHandler.class);
+         Assert.assertNotNull(socks5Handler);
+
+         InetSocketAddress remoteAddress = (InetSocketAddress)socks5Handler.destinationAddress();
+         Assert.assertTrue(remoteAddress.isUnresolved());
+      });
+      Assert.assertNotNull(connection);
+
+      Assert.assertTrue(connection.isOpen());
+      connection.close();
+      Assert.assertFalse(connection.isOpen());
+
+      connector.close();
+      Assert.assertFalse(connector.isStarted());
+   }
+
+   private void startSocksProxy() throws Exception {
+      bossGroup   = new NioEventLoopGroup();
+      workerGroup = new NioEventLoopGroup();
+
+      ServerBootstrap b = new ServerBootstrap();
+      b.group(bossGroup, workerGroup);
+      b.channel(NioServerSocketChannel.class);
+      b.childHandler(new ChannelInitializer<SocketChannel>() {
+         @Override
+         protected void initChannel(SocketChannel ch) throws Exception {
+            // We can further configure SOCKS, but have to assume Netty is doing the right thing,
+            // we just need something listening on the port to make the initial connection
+         }
+      });
+
+      b.bind(SOCKS_PORT).sync();
+   }
+
+   private void stopSocksProxy() {
+      bossGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS);
+      workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS);
+   }
+}