You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/05/05 03:47:07 UTC
[2/4] activemq-artemis git commit: ACTIVEMQ6-96 acceptor limit
ACTIVEMQ6-96 acceptor limit
Adds a configuration property on both in-vm and Netty acceptors
whereby the number of connections allowed is configurable.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3eb835a8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3eb835a8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3eb835a8
Branch: refs/heads/master
Commit: 3eb835a8ab86cca7de3848a003dd70c15f0d186d
Parents: adb0b2b
Author: jbertram <jb...@apache.org>
Authored: Mon May 4 11:15:33 2015 -0500
Committer: jbertram <jb...@apache.org>
Committed: Mon May 4 16:07:27 2015 -0500
----------------------------------------------------------------------
.../remoting/impl/netty/TransportConstants.java | 5 +
.../core/remoting/impl/invm/InVMAcceptor.java | 16 ++++
.../core/remoting/impl/invm/InVMConnector.java | 19 +++-
.../remoting/impl/invm/TransportConstants.java | 4 +
.../core/remoting/impl/netty/NettyAcceptor.java | 55 +++++++----
docs/user-manual/en/configuring-transports.md | 8 ++
.../integration/server/ConnectionLimitTest.java | 96 ++++++++++++++++++++
7 files changed, 179 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3eb835a8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index b7299ed..74e435f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -191,6 +191,10 @@ public class TransportConstants
public static final int DEFAULT_NETTY_CONNECT_TIMEOUT = -1;
+ public static final String CONNECTIONS_ALLOWED = "connectionsAllowed";
+
+ public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L;
+
static
{
Set<String> allowableAcceptorKeys = new HashSet<String>();
@@ -224,6 +228,7 @@ public class TransportConstants
allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL);
allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID);
+ allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED);
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3eb835a8/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
index 1c5a53b..857f503 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
@@ -64,6 +64,8 @@ public final class InVMAcceptor implements Acceptor
private ActiveMQPrincipal defaultActiveMQPrincipal;
+ private final long connectionsAllowed;
+
public InVMAcceptor(final ClusterConnection clusterConnection,
final Map<String, Object> configuration,
final BufferHandler handler,
@@ -81,6 +83,10 @@ public final class InVMAcceptor implements Acceptor
id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0, configuration);
executorFactory = new OrderedExecutorFactory(threadPool);
+
+ connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED,
+ TransportConstants.DEFAULT_CONNECTIONS_ALLOWED,
+ configuration);
}
public Map<String, Object> getConfiguration()
@@ -93,6 +99,16 @@ public final class InVMAcceptor implements Acceptor
return clusterConnection;
}
+ public long getConnectionsAllowed()
+ {
+ return connectionsAllowed;
+ }
+
+ public int getConnectionCount()
+ {
+ return connections.size();
+ }
+
public synchronized void start() throws Exception
{
if (started)
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3eb835a8/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java
index fb1b4ad..f9b7462 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java
@@ -154,12 +154,21 @@ public class InVMConnector extends AbstractConnector
return null;
}
- Connection conn = internalCreateConnection(acceptor.getHandler(), new Listener(), acceptor.getExecutorFactory()
- .getExecutor());
-
- acceptor.connect((String)conn.getID(), handler, this, executorFactory.getExecutor());
+ if (acceptor.getConnectionsAllowed() == -1 || acceptor.getConnectionCount() < acceptor.getConnectionsAllowed())
+ {
+ Connection conn = internalCreateConnection(acceptor.getHandler(), new Listener(), acceptor.getExecutorFactory().getExecutor());
- return conn;
+ acceptor.connect((String) conn.getID(), handler, this, executorFactory.getExecutor());
+ return conn;
+ }
+ else
+ {
+ if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
+ {
+ ActiveMQServerLogger.LOGGER.debug(new StringBuilder().append("Connection limit of ").append(acceptor.getConnectionsAllowed()).append(" reached. Refusing connection."));
+ }
+ return null;
+ }
}
public synchronized void start()
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3eb835a8/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java
index d01f5a8..7905c9c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java
@@ -23,6 +23,10 @@ public final class TransportConstants
public static final int DEFAULT_SERVER_ID = 0;
+ public static final String CONNECTIONS_ALLOWED = "connectionsAllowed";
+
+ public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L;
+
private TransportConstants()
{
// Utility class
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3eb835a8/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index 8ed06be..53a8d69 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -168,6 +168,8 @@ public class NettyAcceptor implements Acceptor
private final boolean httpUpgradeEnabled;
+ private final long connectionsAllowed;
+
public NettyAcceptor(final String name,
final ClusterConnection clusterConnection,
final Map<String, Object> configuration,
@@ -288,6 +290,10 @@ public class NettyAcceptor implements Acceptor
httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME,
TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED,
configuration);
+
+ connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED,
+ TransportConstants.DEFAULT_CONNECTIONS_ALLOWED,
+ configuration);
}
public synchronized void start() throws Exception
@@ -711,36 +717,47 @@ public class NettyAcceptor implements Acceptor
public NettyServerConnection createConnection(final ChannelHandlerContext ctx, String protocol, boolean httpEnabled) throws Exception
{
- super.channelActive(ctx);
- Listener connectionListener = new Listener();
+ if (connectionsAllowed == -1 || connections.size() < connectionsAllowed)
+ {
+ super.channelActive(ctx);
+ Listener connectionListener = new Listener();
- NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver);
+ NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver);
- connectionListener.connectionCreated(NettyAcceptor.this, nc, protocol);
+ connectionListener.connectionCreated(NettyAcceptor.this, nc, protocol);
- SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
- if (sslHandler != null)
- {
- sslHandler.handshakeFuture().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Channel>>()
+ SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
+ if (sslHandler != null)
{
- public void operationComplete(final io.netty.util.concurrent.Future<Channel> future) throws Exception
+ sslHandler.handshakeFuture().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Channel>>()
{
- if (future.isSuccess())
+ public void operationComplete(final io.netty.util.concurrent.Future<Channel> future) throws Exception
{
- active = true;
- }
- else
- {
- future.getNow().close();
+ if (future.isSuccess())
+ {
+ active = true;
+ }
+ else
+ {
+ future.getNow().close();
+ }
}
- }
- });
+ });
+ }
+ else
+ {
+ active = true;
+ }
+ return nc;
}
else
{
- active = true;
+ if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
+ {
+ ActiveMQServerLogger.LOGGER.debug(new StringBuilder().append("Connection limit of ").append(connectionsAllowed).append(" reached. Refusing connection from ").append(ctx.channel().remoteAddress()));
+ }
+ throw new Exception();
}
- return nc;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3eb835a8/docs/user-manual/en/configuring-transports.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/configuring-transports.md b/docs/user-manual/en/configuring-transports.md
index cf04627..6ac2886 100644
--- a/docs/user-manual/en/configuring-transports.md
+++ b/docs/user-manual/en/configuring-transports.md
@@ -285,6 +285,14 @@ Netty for simple TCP:
connector will let the system pick up an ephemeral port. valid ports
are 0 to 65535
+- `connectionsAllowed`. This is only valid for acceptors. It limits the
+ number of connections which the acceptor will allow. When this limit
+ is reached a DEBUG level message is issued to the log, and the connection
+ is refused. The type of client in use will determine what happens when
+ the connection is refused. In the case of a `core` client, it will
+ result in a `org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException`.
+
+
## Configuring Netty SSL
Netty SSL is similar to the Netty TCP transport but it provides
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3eb835a8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java
new file mode 100644
index 0000000..0832f24
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.server;
+
+import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
+import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.tests.util.UnitTestCase;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ConnectionLimitTest extends UnitTestCase
+{
+ private ActiveMQServer server;
+
+ @Override
+ @Before
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ Map nettyParams = new HashMap();
+ nettyParams.put(TransportConstants.CONNECTIONS_ALLOWED, 1);
+
+ Map invmParams = new HashMap();
+ invmParams.put(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.CONNECTIONS_ALLOWED, 1);
+
+ Configuration configuration = createBasicConfig()
+ .addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyParams))
+ .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmParams));
+
+ server = addServer(ActiveMQServers.newActiveMQServer(configuration, false));
+ server.start();
+ }
+
+ @Test
+ public void testInVMConnectionLimit() throws Exception
+ {
+ ServerLocator locator = addServerLocator(createNonHALocator(false));
+ ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
+
+ try
+ {
+ ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
+ fail("creating a session factory here should fail");
+ }
+ catch (Exception e)
+ {
+ assertTrue(e instanceof ActiveMQNotConnectedException);
+ }
+ }
+
+ @Test
+ public void testNettyConnectionLimit() throws Exception
+ {
+ ServerLocator locator = addServerLocator(createNonHALocator(true));
+ locator.setCallTimeout(3000);
+ ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
+ ClientSession clientSession = addClientSession(clientSessionFactory.createSession());
+ ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
+
+ try
+ {
+ ClientSession extraClientSession = addClientSession(extraClientSessionFactory.createSession());
+ fail("creating a session here should fail");
+ }
+ catch (Exception e)
+ {
+ assertTrue(e instanceof ActiveMQConnectionTimedOutException);
+ }
+ }
+}
\ No newline at end of file