You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ni...@apache.org on 2018/09/21 14:14:04 UTC

activemq-artemis git commit: ARTEMIS-2093 NPE thrown by NettyConnector::createConnection

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 9a949230d -> f90afad1b


ARTEMIS-2093 NPE thrown by NettyConnector::createConnection

Given that NettyConnector::createConnection isn't happening on the
channel's event loop, it could race with a channel close event, that
would clean the whole channel pipeline, leading to a NPE while
trying to use a configured channel handler of the pipeline.

(cherry picked from commit 3112b4f3db6a77b3d996d72bac65d539d1135ce8)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f90afad1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f90afad1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f90afad1

Branch: refs/heads/2.6.x
Commit: f90afad1b75238ecc11c01fb5b1e7f4537ff26d8
Parents: 9a94923
Author: Francesco Nigro <ni...@gmail.com>
Authored: Fri Sep 21 15:06:53 2018 +0200
Committer: Francesco Nigro <ni...@gmail.com>
Committed: Fri Sep 21 16:09:09 2018 +0200

----------------------------------------------------------------------
 .../remoting/impl/netty/NettyConnector.java     | 40 +++++++++++-
 .../remoting/impl/netty/NettyConnectorTest.java | 67 ++++++++++++++++++++
 2 files changed, 104 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f90afad1/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index 8dd35e4..31668b3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -47,6 +47,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.stream.Stream;
 
 import io.netty.bootstrap.Bootstrap;
@@ -597,6 +598,7 @@ public class NettyConnector extends AbstractConnector {
             protocolManager.addChannelHandlers(pipeline);
 
             pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, handler, new Listener(), closeExecutor));
+            logger.debugf("Added ActiveMQClientChannelHandler to Channel with id = %s ", channel.id());
          }
       });
 
@@ -712,6 +714,20 @@ public class NettyConnector extends AbstractConnector {
 
    @Override
    public Connection createConnection() {
+      return createConnection(null);
+   }
+
+   /**
+    * Create and return a connection from this connector.
+    * <p>
+    * This method must NOT throw an exception if it fails to create the connection
+    * (e.g. network is not available), in this case it MUST return null.<br>
+    * This version can be used for testing purposes.
+    *
+    * @param onConnect a callback that would be called right after {@link Bootstrap#connect()}
+    * @return The connection, or {@code null} if unable to create a connection (e.g. network is unavailable)
+    */
+   public final Connection createConnection(Consumer<ChannelFuture> onConnect) {
       if (channelClazz == null) {
          return null;
       }
@@ -733,7 +749,9 @@ public class NettyConnector extends AbstractConnector {
       } else {
          future = bootstrap.connect(remoteDestination);
       }
-
+      if (onConnect != null) {
+         onConnect.accept(future);
+      }
       future.awaitUninterruptibly();
 
       if (future.isSuccess()) {
@@ -745,7 +763,15 @@ public class NettyConnector extends AbstractConnector {
                if (handshakeFuture.isSuccess()) {
                   ChannelPipeline channelPipeline = ch.pipeline();
                   ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class);
-                  channelHandler.active = true;
+                  if (channelHandler != null) {
+                     channelHandler.active = true;
+                  } else {
+                     ch.close().awaitUninterruptibly();
+                     ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(
+                        new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " +
+                                                     remoteDestination + " from Channel with id = " + ch.id()));
+                     return null;
+                  }
                } else {
                   ch.close().awaitUninterruptibly();
                   ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(handshakeFuture.cause());
@@ -805,7 +831,15 @@ public class NettyConnector extends AbstractConnector {
          } else {
             ChannelPipeline channelPipeline = ch.pipeline();
             ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class);
-            channelHandler.active = true;
+            if (channelHandler != null) {
+               channelHandler.active = true;
+            } else {
+               ch.close().awaitUninterruptibly();
+               ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(
+                  new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " +
+                                               remoteDestination + " from Channel with id = " + ch.id()));
+               return null;
+            }
          }
 
          // No acceptor on a client connection

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f90afad1/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
index d302be7..e3e279f 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
@@ -18,13 +18,20 @@ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
+import io.netty.channel.ChannelPipeline;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
 import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
 import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
@@ -32,10 +39,40 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 public class NettyConnectorTest extends ActiveMQTestBase {
 
+   private ActiveMQServer server;
+   private ExecutorService executorService;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      executorService = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
+
+      Map<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
+      params.put(TransportConstants.SSL_PROVIDER, TransportConstants.OPENSSL_PROVIDER);
+      params.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, "openssl-server-side-keystore.jks");
+      params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample");
+      params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME,  "openssl-server-side-truststore.jks");
+      params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample");
+      params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true);
+      ConfigurationImpl config = createBasicConfig().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "nettySSL"));
+      server = createServer(false, config);
+      server.start();
+      waitForServerToStart(server);
+   }
+
+   @Override
+   public void tearDown() throws Exception {
+      executorService.shutdown();
+      super.tearDown();
+   }
+
    private ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
       @Override
       public void connectionException(final Object connectionID, final ActiveMQException me) {
@@ -197,4 +234,34 @@ public class NettyConnectorTest extends ActiveMQTestBase {
       connector.close();
       Assert.assertFalse(connector.isStarted());
    }
+
+   @Test
+   public void testChannelHandlerRemovedWhileCreatingConnection() throws Exception {
+      BufferHandler handler = (connectionID, buffer) -> {
+      };
+      Map<String, Object> params = new HashMap<>();
+      final ExecutorService closeExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
+      final ExecutorService threadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
+      final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory());
+      try {
+         NettyConnector connector = new NettyConnector(params, handler, listener, closeExecutor, threadPool, scheduledThreadPool);
+         connector.start();
+         final Connection connection = connector.createConnection(future -> {
+            future.awaitUninterruptibly();
+            Assert.assertTrue(future.isSuccess());
+            final ChannelPipeline pipeline = future.channel().pipeline();
+            final ActiveMQChannelHandler activeMQChannelHandler = pipeline.get(ActiveMQChannelHandler.class);
+            Assert.assertNotNull(activeMQChannelHandler);
+            pipeline.remove(activeMQChannelHandler);
+            Assert.assertNull(pipeline.get(ActiveMQChannelHandler.class));
+         });
+         Assert.assertNull(connection);
+         connector.close();
+      } finally {
+         closeExecutor.shutdownNow();
+         threadPool.shutdownNow();
+         scheduledThreadPool.shutdownNow();
+      }
+   }
+
 }