You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ni...@apache.org on 2018/09/21 14:14:04 UTC
activemq-artemis git commit: ARTEMIS-2093 NPE thrown by
NettyConnector::createConnection
Repository: activemq-artemis
Updated Branches:
refs/heads/2.6.x 9a949230d -> f90afad1b
ARTEMIS-2093 NPE thrown by NettyConnector::createConnection
Given that NettyConnector::createConnection isn't happening on the
channel's event loop, it could race with a channel close event, that
would clean the whole channel pipeline, leading to a NPE while
trying to use a configured channel handler of the pipeline.
(cherry picked from commit 3112b4f3db6a77b3d996d72bac65d539d1135ce8)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f90afad1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f90afad1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f90afad1
Branch: refs/heads/2.6.x
Commit: f90afad1b75238ecc11c01fb5b1e7f4537ff26d8
Parents: 9a94923
Author: Francesco Nigro <ni...@gmail.com>
Authored: Fri Sep 21 15:06:53 2018 +0200
Committer: Francesco Nigro <ni...@gmail.com>
Committed: Fri Sep 21 16:09:09 2018 +0200
----------------------------------------------------------------------
.../remoting/impl/netty/NettyConnector.java | 40 +++++++++++-
.../remoting/impl/netty/NettyConnectorTest.java | 67 ++++++++++++++++++++
2 files changed, 104 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f90afad1/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index 8dd35e4..31668b3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -47,6 +47,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.stream.Stream;
import io.netty.bootstrap.Bootstrap;
@@ -597,6 +598,7 @@ public class NettyConnector extends AbstractConnector {
protocolManager.addChannelHandlers(pipeline);
pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, handler, new Listener(), closeExecutor));
+ logger.debugf("Added ActiveMQClientChannelHandler to Channel with id = %s ", channel.id());
}
});
@@ -712,6 +714,20 @@ public class NettyConnector extends AbstractConnector {
@Override
public Connection createConnection() {
+ return createConnection(null);
+ }
+
+ /**
+ * Create and return a connection from this connector.
+ * <p>
+ * This method must NOT throw an exception if it fails to create the connection
+ * (e.g. network is not available), in this case it MUST return null.<br>
+ * This version can be used for testing purposes.
+ *
+ * @param onConnect a callback that would be called right after {@link Bootstrap#connect()}
+ * @return The connection, or {@code null} if unable to create a connection (e.g. network is unavailable)
+ */
+ public final Connection createConnection(Consumer<ChannelFuture> onConnect) {
if (channelClazz == null) {
return null;
}
@@ -733,7 +749,9 @@ public class NettyConnector extends AbstractConnector {
} else {
future = bootstrap.connect(remoteDestination);
}
-
+ if (onConnect != null) {
+ onConnect.accept(future);
+ }
future.awaitUninterruptibly();
if (future.isSuccess()) {
@@ -745,7 +763,15 @@ public class NettyConnector extends AbstractConnector {
if (handshakeFuture.isSuccess()) {
ChannelPipeline channelPipeline = ch.pipeline();
ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class);
- channelHandler.active = true;
+ if (channelHandler != null) {
+ channelHandler.active = true;
+ } else {
+ ch.close().awaitUninterruptibly();
+ ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(
+ new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " +
+ remoteDestination + " from Channel with id = " + ch.id()));
+ return null;
+ }
} else {
ch.close().awaitUninterruptibly();
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(handshakeFuture.cause());
@@ -805,7 +831,15 @@ public class NettyConnector extends AbstractConnector {
} else {
ChannelPipeline channelPipeline = ch.pipeline();
ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class);
- channelHandler.active = true;
+ if (channelHandler != null) {
+ channelHandler.active = true;
+ } else {
+ ch.close().awaitUninterruptibly();
+ ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(
+ new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " +
+ remoteDestination + " from Channel with id = " + ch.id()));
+ return null;
+ }
}
// No acceptor on a client connection
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f90afad1/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
index d302be7..e3e279f 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
@@ -18,13 +18,20 @@ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import io.netty.channel.ChannelPipeline;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
@@ -32,10 +39,40 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
public class NettyConnectorTest extends ActiveMQTestBase {
+ private ActiveMQServer server;
+ private ExecutorService executorService;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ executorService = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
+
+ Map<String, Object> params = new HashMap<>();
+ params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
+ params.put(TransportConstants.SSL_PROVIDER, TransportConstants.OPENSSL_PROVIDER);
+ params.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, "openssl-server-side-keystore.jks");
+ params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample");
+ params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "openssl-server-side-truststore.jks");
+ params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample");
+ params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true);
+ ConfigurationImpl config = createBasicConfig().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "nettySSL"));
+ server = createServer(false, config);
+ server.start();
+ waitForServerToStart(server);
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ executorService.shutdown();
+ super.tearDown();
+ }
+
private ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
@Override
public void connectionException(final Object connectionID, final ActiveMQException me) {
@@ -197,4 +234,34 @@ public class NettyConnectorTest extends ActiveMQTestBase {
connector.close();
Assert.assertFalse(connector.isStarted());
}
+
+ @Test
+ public void testChannelHandlerRemovedWhileCreatingConnection() throws Exception {
+ BufferHandler handler = (connectionID, buffer) -> {
+ };
+ Map<String, Object> params = new HashMap<>();
+ final ExecutorService closeExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
+ final ExecutorService threadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
+ final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory());
+ try {
+ NettyConnector connector = new NettyConnector(params, handler, listener, closeExecutor, threadPool, scheduledThreadPool);
+ connector.start();
+ final Connection connection = connector.createConnection(future -> {
+ future.awaitUninterruptibly();
+ Assert.assertTrue(future.isSuccess());
+ final ChannelPipeline pipeline = future.channel().pipeline();
+ final ActiveMQChannelHandler activeMQChannelHandler = pipeline.get(ActiveMQChannelHandler.class);
+ Assert.assertNotNull(activeMQChannelHandler);
+ pipeline.remove(activeMQChannelHandler);
+ Assert.assertNull(pipeline.get(ActiveMQChannelHandler.class));
+ });
+ Assert.assertNull(connection);
+ connector.close();
+ } finally {
+ closeExecutor.shutdownNow();
+ threadPool.shutdownNow();
+ scheduledThreadPool.shutdownNow();
+ }
+ }
+
}