You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/05/05 03:47:07 UTC

[2/4] activemq-artemis git commit: ACTIVEMQ6-96 acceptor limit

ACTIVEMQ6-96 acceptor limit

Adds a configuration property on both in-vm and Netty acceptors
whereby the number of connections allowed is configurable.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3eb835a8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3eb835a8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3eb835a8

Branch: refs/heads/master
Commit: 3eb835a8ab86cca7de3848a003dd70c15f0d186d
Parents: adb0b2b
Author: jbertram <jb...@apache.org>
Authored: Mon May 4 11:15:33 2015 -0500
Committer: jbertram <jb...@apache.org>
Committed: Mon May 4 16:07:27 2015 -0500

----------------------------------------------------------------------
 .../remoting/impl/netty/TransportConstants.java |  5 +
 .../core/remoting/impl/invm/InVMAcceptor.java   | 16 ++++
 .../core/remoting/impl/invm/InVMConnector.java  | 19 +++-
 .../remoting/impl/invm/TransportConstants.java  |  4 +
 .../core/remoting/impl/netty/NettyAcceptor.java | 55 +++++++----
 docs/user-manual/en/configuring-transports.md   |  8 ++
 .../integration/server/ConnectionLimitTest.java | 96 ++++++++++++++++++++
 7 files changed, 179 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3eb835a8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index b7299ed..74e435f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -191,6 +191,10 @@ public class TransportConstants
 
    public static final int DEFAULT_NETTY_CONNECT_TIMEOUT = -1;
 
+   public static final String CONNECTIONS_ALLOWED = "connectionsAllowed";
+
+   public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L;
+
    static
    {
       Set<String> allowableAcceptorKeys = new HashSet<String>();
@@ -224,6 +228,7 @@ public class TransportConstants
       allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE);
       allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL);
       allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID);
+      allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED);
       allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
       allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3eb835a8/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
index 1c5a53b..857f503 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
@@ -64,6 +64,8 @@ public final class InVMAcceptor implements Acceptor
 
    private ActiveMQPrincipal defaultActiveMQPrincipal;
 
+   private final long connectionsAllowed;
+
    public InVMAcceptor(final ClusterConnection clusterConnection,
                        final Map<String, Object> configuration,
                        final BufferHandler handler,
@@ -81,6 +83,10 @@ public final class InVMAcceptor implements Acceptor
       id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0, configuration);
 
       executorFactory = new OrderedExecutorFactory(threadPool);
+
+      connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED,
+                                                               TransportConstants.DEFAULT_CONNECTIONS_ALLOWED,
+                                                               configuration);
    }
 
    public Map<String, Object> getConfiguration()
@@ -93,6 +99,16 @@ public final class InVMAcceptor implements Acceptor
       return clusterConnection;
    }
 
+   public long getConnectionsAllowed()
+   {
+      return connectionsAllowed;
+   }
+
+   public int getConnectionCount()
+   {
+      return connections.size();
+   }
+
    public synchronized void start() throws Exception
    {
       if (started)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3eb835a8/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java
index fb1b4ad..f9b7462 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java
@@ -154,12 +154,21 @@ public class InVMConnector extends AbstractConnector
          return null;
       }
 
-      Connection conn = internalCreateConnection(acceptor.getHandler(), new Listener(), acceptor.getExecutorFactory()
-                                                                                                .getExecutor());
-
-      acceptor.connect((String)conn.getID(), handler, this, executorFactory.getExecutor());
+      if (acceptor.getConnectionsAllowed() == -1 || acceptor.getConnectionCount() < acceptor.getConnectionsAllowed())
+      {
+         Connection conn = internalCreateConnection(acceptor.getHandler(), new Listener(), acceptor.getExecutorFactory().getExecutor());
 
-      return conn;
+         acceptor.connect((String) conn.getID(), handler, this, executorFactory.getExecutor());
+         return conn;
+      }
+      else
+      {
+         if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
+         {
+            ActiveMQServerLogger.LOGGER.debug(new StringBuilder().append("Connection limit of ").append(acceptor.getConnectionsAllowed()).append(" reached. Refusing connection."));
+         }
+         return null;
+      }
    }
 
    public synchronized void start()

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3eb835a8/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java
index d01f5a8..7905c9c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java
@@ -23,6 +23,10 @@ public final class TransportConstants
 
    public static final int DEFAULT_SERVER_ID = 0;
 
+   public static final String CONNECTIONS_ALLOWED = "connectionsAllowed";
+
+   public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L;
+
    private TransportConstants()
    {
       // Utility class

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3eb835a8/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index 8ed06be..53a8d69 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -168,6 +168,8 @@ public class NettyAcceptor implements Acceptor
 
    private final boolean httpUpgradeEnabled;
 
+   private final long connectionsAllowed;
+
    public NettyAcceptor(final String name,
                         final ClusterConnection clusterConnection,
                         final Map<String, Object> configuration,
@@ -288,6 +290,10 @@ public class NettyAcceptor implements Acceptor
       httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME,
                                                                   TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED,
                                                                   configuration);
+
+      connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED,
+                                                               TransportConstants.DEFAULT_CONNECTIONS_ALLOWED,
+                                                               configuration);
    }
 
    public synchronized void start() throws Exception
@@ -711,36 +717,47 @@ public class NettyAcceptor implements Acceptor
 
       public NettyServerConnection createConnection(final ChannelHandlerContext ctx, String protocol, boolean httpEnabled) throws Exception
       {
-         super.channelActive(ctx);
-         Listener connectionListener = new Listener();
+         if (connectionsAllowed == -1 || connections.size() < connectionsAllowed)
+         {
+            super.channelActive(ctx);
+            Listener connectionListener = new Listener();
 
-         NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver);
+            NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver);
 
-         connectionListener.connectionCreated(NettyAcceptor.this, nc, protocol);
+            connectionListener.connectionCreated(NettyAcceptor.this, nc, protocol);
 
-         SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
-         if (sslHandler != null)
-         {
-            sslHandler.handshakeFuture().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Channel>>()
+            SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
+            if (sslHandler != null)
             {
-               public void operationComplete(final io.netty.util.concurrent.Future<Channel> future) throws Exception
+               sslHandler.handshakeFuture().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Channel>>()
                {
-                  if (future.isSuccess())
+                  public void operationComplete(final io.netty.util.concurrent.Future<Channel> future) throws Exception
                   {
-                     active = true;
-                  }
-                  else
-                  {
-                     future.getNow().close();
+                     if (future.isSuccess())
+                     {
+                        active = true;
+                     }
+                     else
+                     {
+                        future.getNow().close();
+                     }
                   }
-               }
-            });
+               });
+            }
+            else
+            {
+               active = true;
+            }
+            return nc;
          }
          else
          {
-            active = true;
+            if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
+            {
+               ActiveMQServerLogger.LOGGER.debug(new StringBuilder().append("Connection limit of ").append(connectionsAllowed).append(" reached. Refusing connection from ").append(ctx.channel().remoteAddress()));
+            }
+            throw new Exception();
          }
-         return nc;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3eb835a8/docs/user-manual/en/configuring-transports.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/configuring-transports.md b/docs/user-manual/en/configuring-transports.md
index cf04627..6ac2886 100644
--- a/docs/user-manual/en/configuring-transports.md
+++ b/docs/user-manual/en/configuring-transports.md
@@ -285,6 +285,14 @@ Netty for simple TCP:
     connector will let the system pick up an ephemeral port. valid ports
     are 0 to 65535
 
+-   `connectionsAllowed`. This is only valid for acceptors. It limits the
+    number of connections which the acceptor will allow. When this limit
+    is reached a DEBUG level message is issued to the log, and the connection
+    is refused. The type of client in use will determine what happens when
+    the connection is refused. In the case of a `core` client, it will
+    result in a `org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException`.
+
+
 ## Configuring Netty SSL
 
 Netty SSL is similar to the Netty TCP transport but it provides

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3eb835a8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java
new file mode 100644
index 0000000..0832f24
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.server;
+
+import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
+import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.tests.util.UnitTestCase;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ConnectionLimitTest extends UnitTestCase
+{
+   private ActiveMQServer server;
+
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+
+      Map nettyParams = new HashMap();
+      nettyParams.put(TransportConstants.CONNECTIONS_ALLOWED, 1);
+
+      Map invmParams = new HashMap();
+      invmParams.put(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.CONNECTIONS_ALLOWED, 1);
+
+      Configuration configuration = createBasicConfig()
+              .addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyParams))
+              .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmParams));
+
+      server = addServer(ActiveMQServers.newActiveMQServer(configuration, false));
+      server.start();
+   }
+
+   @Test
+   public void testInVMConnectionLimit() throws Exception
+   {
+      ServerLocator locator = addServerLocator(createNonHALocator(false));
+      ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
+
+      try
+      {
+         ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
+         fail("creating a session factory here should fail");
+      }
+      catch (Exception e)
+      {
+         assertTrue(e instanceof ActiveMQNotConnectedException);
+      }
+   }
+
+   @Test
+   public void testNettyConnectionLimit() throws Exception
+   {
+      ServerLocator locator = addServerLocator(createNonHALocator(true));
+      locator.setCallTimeout(3000);
+      ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
+      ClientSession clientSession = addClientSession(clientSessionFactory.createSession());
+      ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
+
+      try
+      {
+         ClientSession extraClientSession = addClientSession(extraClientSessionFactory.createSession());
+         fail("creating a session here should fail");
+      }
+      catch (Exception e)
+      {
+         assertTrue(e instanceof ActiveMQConnectionTimedOutException);
+      }
+   }
+}
\ No newline at end of file