You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/05/31 21:41:00 UTC

[jira] [Commented] (QPID-8484) [Broker-J] Reimplementation of the limit number of connections per user

    [ https://issues.apache.org/jira/browse/QPID-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354661#comment-17354661 ] 

ASF GitHub Bot commented on QPID-8484:
--------------------------------------

alex-rufous commented on a change in pull request #76:
URL: https://github.com/apache/qpid-broker-j/pull/76#discussion_r642569553



##########
File path: broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionSlot.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security.limit;
+
+import java.util.Optional;
+
+@FunctionalInterface
+public interface ConnectionSlot extends Runnable

Review comment:
       As far as I understood, the responsibility of this interface is provide a mechanism to de-register a connection on the connection limiter when the connection is closed.
   There is already an existing mechanism to perform any pending close/delete activities which is  a "delete task".
   Please use the existing mechanism. There is no need to introduce a new one.
   
   If an existing mechanism has flows, those flows has to be fixed. For example, if a throwing an exception as part of delete task breaks the unregistration logic, please add exception handling into execution of "delete tasks" to make sure that all of them get executed regadless failing ones. If required, the delete task exceptions could be re-thrown after execution all of them.
   
   Thus, I strongly believe that ConnectionSlot interface is redundant interface which duplicates existing "delete task" functionality. Please delete ConnectionSlot.
   

##########
File path: broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
##########
@@ -148,10 +148,7 @@ void registerTransactionTickers(ServerTransaction serverTransaction,
     @Override
     AmqpPort<?> getPort();
 
-    void registered(ConnectionPrincipalStatistics connectionPrincipalStatistics);
-
-    int getAuthenticatedPrincipalConnectionCount();
-
-    int getAuthenticatedPrincipalConnectionFrequency();
+    void addConnectionSlot(NamedAddressSpace addressSpace);

Review comment:
       Please remove new methods in AMQPConnection interface:
   
   void addConnectionSlot(NamedAddressSpace addressSpace, String userId) throws ConnectionLimitException;
   void freeConnectionSlots();
       
   As per  review comments for ConnectionSlot the "delete task" should be used to perform deregistration activities. The introduced methods addConnectionSlot, freeConnectionSlots duplicate the "delete task" functionality. 

##########
File path: broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiter.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security.limit;
+
+import java.util.Optional;
+
+import org.apache.qpid.server.security.limit.ConnectionSlot.FreeSlot;
+import org.apache.qpid.server.transport.AMQPConnection;
+
+public interface ConnectionLimiter
+{
+    ConnectionSlot register(AMQPConnection<?> connection);
+
+    ConnectionLimiter append(ConnectionLimiter limiter);

Review comment:
       What are the responsibility of this method?
   How many underlying implementations can be appended?
   What happens when append is called multiple times? How the appended multiple ConnectionLimiter behaves? 
   if the intention here to call the method once, the name should be changed to setUnderlyingConnectionLimiter
   
   What object is responsible for calling append method?
   Why this method is actually required on the ConnectionLimiter interface?

##########
File path: broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiter.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security.limit;
+
+import java.util.Optional;
+
+import org.apache.qpid.server.security.limit.ConnectionSlot.FreeSlot;
+import org.apache.qpid.server.transport.AMQPConnection;
+
+public interface ConnectionLimiter
+{
+    ConnectionSlot register(AMQPConnection<?> connection);
+
+    ConnectionLimiter append(ConnectionLimiter limiter);
+
+    static CachedLimiter noLimits()
+    {
+        return NoLimits.INSTANCE;
+    }
+
+    static CachedLimiter blockEveryone()
+    {
+        return BlockEveryone.INSTANCE;
+    }
+
+    static CachedLimiter cacheConnectionRegistration(ConnectionLimiter limiter) {

Review comment:
       The implementation is really confusing.
   
   Both NoLimits and BlockEveryone are CachedLimiter.
   If those would be passed here, the real  CachedConnectionLimiterImpl will not be created.
   
   That's for me looks like a defect in the implementation.
   
   Why this method is required?
   
   

##########
File path: broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiter.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security.limit;
+
+import java.util.Optional;
+
+import org.apache.qpid.server.security.limit.ConnectionSlot.FreeSlot;
+import org.apache.qpid.server.transport.AMQPConnection;
+
+public interface ConnectionLimiter
+{
+    ConnectionSlot register(AMQPConnection<?> connection);
+
+    ConnectionLimiter append(ConnectionLimiter limiter);
+
+    static CachedLimiter noLimits()
+    {
+        return NoLimits.INSTANCE;
+    }
+
+    static CachedLimiter blockEveryone()
+    {
+        return BlockEveryone.INSTANCE;
+    }
+
+    static CachedLimiter cacheConnectionRegistration(ConnectionLimiter limiter) {
+        if (limiter instanceof CachedLimiter) {
+            return (CachedLimiter) limiter;
+        }
+        return new CachedConnectionLimiterImpl(limiter);
+    }
+
+    interface CachedLimiter extends ConnectionLimiter
+    {
+        ConnectionLimiter underlyingLimiter();
+
+        @Override
+        default ConnectionLimiter append(ConnectionLimiter limiter) {
+            return underlyingLimiter().append(limiter);
+        }
+    }
+
+    final class NoLimits implements CachedLimiter
+    {
+        static CachedLimiter INSTANCE = new NoLimits();
+
+        private NoLimits()
+        {
+            super();
+        }
+
+        @Override
+        public ConnectionSlot register(AMQPConnection<?> connection)
+        {
+            return FreeSlot.INSTANCE;
+        }
+
+        @Override
+        public ConnectionLimiter append(ConnectionLimiter limiter)
+        {
+            return Optional.ofNullable(limiter).orElse(this);
+        }
+
+        @Override
+        public ConnectionLimiter underlyingLimiter()
+        {
+            return this;

Review comment:
       If NoLImit will not implement CachedLimiter, the implementation would be deleted
   

##########
File path: broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
##########
@@ -48,8 +49,13 @@
 
     boolean registerConnection(AMQPConnection<?> connection,
                                final ConnectionEstablishmentPolicy connectionEstablishmentPolicy);
+
     void deregisterConnection(AMQPConnection<?> connection);
 
+    default ConnectionSlot requestConnectionSlot(AMQPConnection<?> connection)

Review comment:
       At the moment only authenticated principal is passed into the method to check the limits. What about group principals? if limit is set for entire group, how groups are supposed to be checked?
   
   Does it make sense to check also for group principals in addition to authenticated principal?
   Please note that there is no reason to pass userId. The AMQPConnection comes with methods to get authorized principal( getAuthorizedPrincipal ) and subject (getSubject).
   
   I would like to suggest a removal of userId parameter.
   
   The method needs to return void. (Potentially boolean can be returned)
   
   I think that method name should be renamed into checkUserConnectionLimit or a bit more appropriate name. I think that that the name requestConnectionSlot does not represent what method does.

##########
File path: broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
##########
@@ -1161,28 +1164,18 @@ public void incrementTransactionBeginCounter()
     }
 
     @Override
-    public void registered(final ConnectionPrincipalStatistics connectionPrincipalStatistics)
+    public void freeConnectionSlots()
     {
-        _connectionPrincipalStatistics = connectionPrincipalStatistics;
-    }
-
-    @Override
-    public int getAuthenticatedPrincipalConnectionCount()
-    {
-        if (_connectionPrincipalStatistics == null)
+        ConnectionSlot slot;
+        while ((slot = _registeredConnectionSlots.poll()) != null)
         {
-            return 0;
+            slot.free();
         }
-        return _connectionPrincipalStatistics.getConnectionCount();
     }
 
     @Override
-    public int getAuthenticatedPrincipalConnectionFrequency()
+    public void addConnectionSlot(NamedAddressSpace addressSpace)

Review comment:
       As per comments above, please use #addDeleteTask to create a callback for deregistering of connection on a limiter.
   Please remove addConnectionSlot.

##########
File path: broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
##########
@@ -149,7 +151,8 @@
     private long _maxUncommittedInMemorySize;
 
     private final Map<ServerTransaction, Set<Ticker>> _transactionTickers = new ConcurrentHashMap<>();
-    private volatile ConnectionPrincipalStatistics _connectionPrincipalStatistics;
+
+    private final Queue<ConnectionSlot> _registeredConnectionSlots = new ConcurrentLinkedQueue<>();

Review comment:
       Please remove _registeredConnectionSlots
   
   As per  review comments for ConnectionSlot, the "delete task" should be used to perform deregistration activities. 
   
   It seems the  field _registeredConnectionSlots  duplicates filed _connectionCloseTaskList

##########
File path: broker-core/src/main/java/org/apache/qpid/server/security/limit/CachedConnectionLimiterImpl.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security.limit;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.server.security.limit.ConnectionLimiter.CachedLimiter;
+import org.apache.qpid.server.transport.AMQPConnection;
+
+final class CachedConnectionLimiterImpl implements CachedLimiter

Review comment:
       Marec,
   Could you please expand why CachedConnectionLimiterImpl is required?
   What is the use case for this implementation?
   
   Does it mean that the same CachedLimiter can be called multiple times? If so, why the implementation allow that?
   

##########
File path: broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiter.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security.limit;
+
+import java.util.Optional;
+
+import org.apache.qpid.server.security.limit.ConnectionSlot.FreeSlot;
+import org.apache.qpid.server.transport.AMQPConnection;
+
+public interface ConnectionLimiter
+{
+    ConnectionSlot register(AMQPConnection<?> connection);
+
+    ConnectionLimiter append(ConnectionLimiter limiter);
+
+    static CachedLimiter noLimits()
+    {
+        return NoLimits.INSTANCE;
+    }
+
+    static CachedLimiter blockEveryone()
+    {
+        return BlockEveryone.INSTANCE;
+    }
+
+    static CachedLimiter cacheConnectionRegistration(ConnectionLimiter limiter) {
+        if (limiter instanceof CachedLimiter) {
+            return (CachedLimiter) limiter;
+        }
+        return new CachedConnectionLimiterImpl(limiter);
+    }
+
+    interface CachedLimiter extends ConnectionLimiter
+    {
+        ConnectionLimiter underlyingLimiter();
+
+        @Override
+        default ConnectionLimiter append(ConnectionLimiter limiter) {
+            return underlyingLimiter().append(limiter);
+        }
+    }
+
+    final class NoLimits implements CachedLimiter
+    {
+        static CachedLimiter INSTANCE = new NoLimits();
+
+        private NoLimits()
+        {
+            super();
+        }
+
+        @Override
+        public ConnectionSlot register(AMQPConnection<?> connection)
+        {
+            return FreeSlot.INSTANCE;
+        }
+
+        @Override
+        public ConnectionLimiter append(ConnectionLimiter limiter)
+        {
+            return Optional.ofNullable(limiter).orElse(this);
+        }
+
+        @Override
+        public ConnectionLimiter underlyingLimiter()
+        {
+            return this;
+        }
+    }
+
+    final class BlockEveryone implements CachedLimiter
+    {
+        static CachedLimiter INSTANCE = new BlockEveryone();
+
+        private BlockEveryone()
+        {
+            super();
+        }
+
+        @Override
+        public ConnectionSlot register(AMQPConnection<?> connection)
+        {
+            throw new ConnectionLimitException("Opening any new connection is forbidden");
+        }
+
+        @Override
+        public ConnectionLimiter append(ConnectionLimiter limiter)
+        {
+            return this;
+        }
+
+        @Override
+        public ConnectionLimiter underlyingLimiter()
+        {
+            return this;

Review comment:
       The implementation indicates that BlockEveryone should not really implement CachedLimiter

##########
File path: broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiterContext.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security.limit;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ManagedContextDefault;
+import org.apache.qpid.server.model.ManagedObject;
+
+@ManagedObject(category = false)
+public interface ConnectionLimiterContext<X extends ConfiguredObject<X>> extends ConfiguredObject<X>

Review comment:
       Why ConnectionLimiterContext is a configured object?
   What is the purpose of this interface?
   IMHO, the context variable should be defined on the ConnectionLimiterProvider
   Why we cannot merger this interface into BrokerConnectionLimiter?

##########
File path: broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiter.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security.limit;
+
+import java.util.Optional;
+
+import org.apache.qpid.server.security.limit.ConnectionSlot.FreeSlot;
+import org.apache.qpid.server.transport.AMQPConnection;
+
+public interface ConnectionLimiter
+{
+    ConnectionSlot register(AMQPConnection<?> connection);

Review comment:
       As per comments on ConnectionSlot, the interface ConnectionSlot seems redundant to me.
   Thus, after deletion of ConnectionSlot, the method should return void.

##########
File path: broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiter.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security.limit;
+
+import java.util.Optional;
+
+import org.apache.qpid.server.security.limit.ConnectionSlot.FreeSlot;
+import org.apache.qpid.server.transport.AMQPConnection;
+
+public interface ConnectionLimiter
+{
+    ConnectionSlot register(AMQPConnection<?> connection);
+
+    ConnectionLimiter append(ConnectionLimiter limiter);
+
+    static CachedLimiter noLimits()
+    {
+        return NoLimits.INSTANCE;
+    }
+
+    static CachedLimiter blockEveryone()
+    {
+        return BlockEveryone.INSTANCE;
+    }
+
+    static CachedLimiter cacheConnectionRegistration(ConnectionLimiter limiter) {
+        if (limiter instanceof CachedLimiter) {
+            return (CachedLimiter) limiter;
+        }
+        return new CachedConnectionLimiterImpl(limiter);
+    }
+
+    interface CachedLimiter extends ConnectionLimiter
+    {
+        ConnectionLimiter underlyingLimiter();
+
+        @Override
+        default ConnectionLimiter append(ConnectionLimiter limiter) {
+            return underlyingLimiter().append(limiter);
+        }
+    }
+
+    final class NoLimits implements CachedLimiter

Review comment:
       Why NoLimits implements CachedLimiter?

##########
File path: broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConnectionLimiter.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerConnectionLimitProvider;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.SystemConfig;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostConnectionLimitProvider;
+import org.apache.qpid.server.security.limit.ConnectionLimitProvider;
+import org.apache.qpid.server.security.limit.ConnectionLimiter;
+import org.apache.qpid.server.security.limit.ConnectionSlot;
+import org.apache.qpid.server.transport.AMQPConnection;
+
+final class VirtualHostConnectionLimiter implements ConnectionLimiter
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(VirtualHostConnectionLimiter.class);
+
+    private final VirtualHost<?> _virtualHost;
+
+    private final Broker<?> _broker;
+
+    private final Map<ConnectionLimitProvider, ConnectionLimiter> _connectionLimitProviders = new ConcurrentHashMap<>();
+
+    private final AtomicReference<ConnectionLimiter> _limiter = new AtomicReference<>(ConnectionLimiter.noLimits());
+
+    VirtualHostConnectionLimiter(VirtualHost<?> virtualHost, Broker<?> broker)
+    {
+        super();
+        _virtualHost = Objects.requireNonNull(virtualHost);
+        _broker = Objects.requireNonNull(broker);
+    }
+
+    @Override
+    public ConnectionSlot register(AMQPConnection<?> connection)
+    {
+        return _limiter.get().register(connection);
+    }
+
+    @Override
+    public ConnectionLimiter append(ConnectionLimiter limiter)
+    {
+        return _limiter.get().append(limiter);
+    }
+
+    public void open()
+    {
+        _virtualHost.addChangeListener(new VirtualHostChangeListener(this));
+        _broker.addChangeListener(new BrokerChangeListener(this));
+
+        _virtualHost.getChildren(VirtualHostConnectionLimitProvider.class)
+                .forEach(child -> child.addChangeListener(new ProviderChangeListener(this)));
+        _broker.getChildren(BrokerConnectionLimitProvider.class)
+                .forEach(child -> child.addChangeListener(new ProviderChangeListener(this)));
+    }
+
+    public void activate()
+    {
+        update();
+    }
+
+    public void close()
+    {
+        _virtualHost.removeChangeListener(new VirtualHostChangeListener(this));
+        _broker.removeChangeListener(new BrokerChangeListener(this));
+
+        _virtualHost.getChildren(VirtualHostConnectionLimitProvider.class)
+                .forEach(child -> child.removeChangeListener(new ProviderChangeListener(this)));
+        _broker.getChildren(BrokerConnectionLimitProvider.class)
+                .forEach(child -> child.removeChangeListener(new ProviderChangeListener(this)));
+
+        _limiter.set(ConnectionLimiter.noLimits());
+    }
+
+    private void update(ConfiguredObject<?> object)
+    {
+        _connectionLimitProviders.remove(object);
+        update();
+    }
+
+    private void update()
+    {
+        if (!((SystemConfig<?>) _broker.getParent()).isManagementMode())
+        {
+            _limiter.set(newLimiter(_connectionLimitProviders));
+        }
+    }
+
+    private ConnectionLimiter newLimiter(final Map<ConnectionLimitProvider, ConnectionLimiter> cache)
+    {
+        ConnectionLimiter limiter = ConnectionLimiter.noLimits();
+
+        LOGGER.debug("Updating virtual host connection limiters");
+        for (final VirtualHostConnectionLimitProvider<?> provider :
+                _virtualHost.getChildren(VirtualHostConnectionLimitProvider.class))
+        {
+            if (provider.getState() == State.ACTIVE)
+            {
+                limiter = limiter.append(
+                        cache.computeIfAbsent(provider, ConnectionLimitProvider::getConnectionLimiter));
+            }
+            else if (provider.getState() == State.ERRORED)
+            {
+                limiter = ConnectionLimiter.blockEveryone();
+            }
+        }
+
+        LOGGER.debug("Updating broker connection limiters");
+        for (final BrokerConnectionLimitProvider<?> provider :
+                _broker.getChildren(BrokerConnectionLimitProvider.class))
+        {
+            if (provider.getState() == State.ACTIVE)
+            {
+                limiter = limiter.append(
+                        cache.computeIfAbsent(provider, ConnectionLimitProvider::getConnectionLimiter));
+            }
+            else if (provider.getState() == State.ERRORED)
+            {
+                limiter = ConnectionLimiter.blockEveryone();
+            }
+        }
+        return ConnectionLimiter.cacheConnectionRegistration(limiter);
+    }
+
+    private abstract static class AbstractChangeListener extends AbstractConfigurationChangeListener
+    {
+        private final VirtualHostConnectionLimiter _limiter;
+
+        AbstractChangeListener(VirtualHostConnectionLimiter limiter)
+        {
+            super();
+            _limiter = Objects.requireNonNull(limiter);
+        }
+
+        void addProvider(ConfiguredObject<?> provider)
+        {
+            provider.addChangeListener(new ProviderChangeListener(_limiter));
+            _limiter.update();
+        }
+
+        void removeProvider(ConfiguredObject<?> provider)
+        {
+            provider.removeChangeListener(new ProviderChangeListener(_limiter));
+            _limiter.update(provider);
+        }
+
+        void updateProvider(ConfiguredObject<?> provider)
+        {
+            _limiter.update(provider);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return _limiter.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (obj instanceof AbstractChangeListener)
+            {
+                return _limiter == ((AbstractChangeListener) obj)._limiter;
+            }
+            return false;
+        }
+    }
+
+    private static final class VirtualHostChangeListener extends AbstractChangeListener

Review comment:
       The VirtualHostChangeListener code is a duplicate of BrokerChangeListener.
   It seems it can be parametrised using category class, and child category class. Those can be passed into constructor
   
   

##########
File path: broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
##########
@@ -2691,6 +2704,12 @@ public String getArguments()
         });
     }
 
+    @Override
+    public ConnectionSlot requestConnectionSlot(AMQPConnection<?> connection)
+    {
+        return _connectionLimiter.register(connection);

Review comment:
       I am still not convinced that implementation of VirtualHostConnectionLimiter brings any benefits here.
   IMHO, from my point of view the iteration over VirtualHost and Broker ConnectionLimitProviders will do absolutely the same job as VirtualHostConnectionLimiter with a complex ecosystem of ConfigurationObjectListeners.
   
   Here is what I believe the implementation of requestConnectionSlot should look like without VirtualHostConnectionLimiter (the code is for illustration purpose; I did not pay much attention on syntax)
   
   
       @Override
       public ConnectionSlot requestConnectionSlot(AMQPConnection<?> connection)
       {
           final Collection<VirtualHostConnectionLimitProvider> hostLimiters = _virtualHost.getChildren(VirtualHostConnectionLimitProvider.class);
           final Collection<BrokerConnectionLimitProvider> brokerLimiters = _broker.getChildren(BrokerConnectionLimitProvider.class);
           final List<? extends ConnectionLimitProvider> allLimiters = new ArrayList<>()
           allLimiters.addAll(hostLimiters);
           allLimiters.addAll(brokerLimiters);
   
   	boolean success = false;
   	try
   	{
   		for (ConnectionLimitProvider limiter : allLimiters)
   		{
   		    limiter.registerAndCheckLimit(connection);
   		}
   		success = true;
   	}
   	finally
   	{
   		if (success)
   		{
   			connection.addDeletedTask(()-> {
   				 final Collection<VirtualHostConnectionLimitProvider> hostLimiters = _virtualHost.getChildren(VirtualHostConnectionLimitProvider.class);
           			 final Collection<BrokerConnectionLimitProvider> brokerLimiters = _broker.getChildren(BrokerConnectionLimitProvider.class);
           			 final List<? extends ConnectionLimitProvider> allLimiters = new ArrayList<>()
   				 allLimiters.addAll(hostLimiters);
   				 allLimiters.addAll(brokerLimiters);
           			 for (ConnectionLimitProvider limiter : limiters)
   				 {
   				    limiter.deregister(connection);
   				 }
   			
   			})
   		}
   		else
   	        {
   		   for (ConnectionLimitProvider limiter : allLimiters)
   		   {
   		       limiter.deregister(connection);
   		   }
   		
   		}
   	}
       }
   
   The advantage of the above is that you do not need to keep complex relationship between the ConnectionLimitProviders. Thus, the code to maintain is reduced significantly...
   The ConnectionLimitProvider interface would be very simple:
   
       interface ConnectionLimitProvider {
   
          void registerAndCheckLimit(AMQPConnection<?> connection);
          void deregister(AMQPConnection<?> connection);
       }
   
   If required we can introduce priority to call ConnectionLimitProviders in a priority order.
   
   The disadvantage of such approach that each implementation ConnectionLimitProvider would have to maintain and manage  connections  separately.
   
   

##########
File path: broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
##########
@@ -1161,28 +1161,35 @@ public void incrementTransactionBeginCounter()
     }
 
     @Override
-    public void registered(final ConnectionPrincipalStatistics connectionPrincipalStatistics)
+    public void freeConnectionSlots()

Review comment:
       Marec,
   You can add an exception handling into functionality to execute delete task to make sure that all them are invoked. That would ensure that the connection is always deregistered

##########
File path: broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiter.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security.limit;
+
+import java.util.Optional;
+
+import org.apache.qpid.server.security.limit.ConnectionSlot.FreeSlot;
+import org.apache.qpid.server.transport.AMQPConnection;
+
+public interface ConnectionLimiter
+{
+    ConnectionSlot register(AMQPConnection<?> connection);
+
+    ConnectionLimiter append(ConnectionLimiter limiter);
+
+    static CachedLimiter noLimits()
+    {
+        return NoLimits.INSTANCE;
+    }
+
+    static CachedLimiter blockEveryone()
+    {
+        return BlockEveryone.INSTANCE;
+    }
+
+    static CachedLimiter cacheConnectionRegistration(ConnectionLimiter limiter) {
+        if (limiter instanceof CachedLimiter) {
+            return (CachedLimiter) limiter;
+        }
+        return new CachedConnectionLimiterImpl(limiter);
+    }
+
+    interface CachedLimiter extends ConnectionLimiter
+    {
+        ConnectionLimiter underlyingLimiter();
+
+        @Override
+        default ConnectionLimiter append(ConnectionLimiter limiter) {
+            return underlyingLimiter().append(limiter);
+        }
+    }
+
+    final class NoLimits implements CachedLimiter
+    {
+        static CachedLimiter INSTANCE = new NoLimits();
+
+        private NoLimits()
+        {
+            super();
+        }
+
+        @Override
+        public ConnectionSlot register(AMQPConnection<?> connection)
+        {
+            return FreeSlot.INSTANCE;
+        }
+
+        @Override
+        public ConnectionLimiter append(ConnectionLimiter limiter)
+        {
+            return Optional.ofNullable(limiter).orElse(this);
+        }
+
+        @Override
+        public ConnectionLimiter underlyingLimiter()
+        {
+            return this;
+        }
+    }
+
+    final class BlockEveryone implements CachedLimiter

Review comment:
       Why BlockEveryone needs to implement CachedLimiter?

##########
File path: broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
##########
@@ -542,69 +539,64 @@ public static String getInstalledProtocolsAsString()
     }
 
     @Override
-    public int incrementConnectionCount()
-    {
-        int openConnections = _connectionCount.incrementAndGet();
-        _totalConnectionCount.incrementAndGet();
-        int maxOpenConnections = getMaxOpenConnections();
-        if(maxOpenConnections > 0
-           && openConnections > (maxOpenConnections * _connectionWarnCount) / 100
-           && _connectionCountWarningGiven.compareAndSet(false, true))
-        {
-            _container.getEventLogger().message(new PortLogSubject(this),
-                                                PortMessages.CONNECTION_COUNT_WARN(openConnections,
-                                                                                _connectionWarnCount,
-                                                                                maxOpenConnections));
-        }
-        return openConnections;
-    }
-
-    @Override
-    public int decrementConnectionCount()
+    public long decrementConnectionCount()
     {
-        int openConnections = _connectionCount.decrementAndGet();
-        int maxOpenConnections = getMaxOpenConnections();
+        final long openConnections = _connectionCount.decrementAndGet();
+        final long maxOpenConnections = getMaxOpenConnections();
 
-        if(maxOpenConnections > 0
-           && openConnections < (maxOpenConnections * square(_connectionWarnCount)) / 10000)
+        if (maxOpenConnections > 0
+                && openConnections < (maxOpenConnections * square(_connectionWarnCount)) / 10000L)
         {
-           _connectionCountWarningGiven.compareAndSet(true,false);
+            _connectionCountWarningGiven.compareAndSet(true, false);
         }
-
-
         return openConnections;
     }
 
-    private static int square(int val)
+    private static long square(long val)
     {
         return val * val;
     }
 
     @Override
     public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress)
     {
-        String addressString = remoteSocketAddress.toString();
+        final String addressString = remoteSocketAddress.toString();
         if (_closingOrDeleting.get())
         {
             _container.getEventLogger().message(new PortLogSubject(this),
-                                                PortMessages.CONNECTION_REJECTED_CLOSED(addressString));
+                    PortMessages.CONNECTION_REJECTED_CLOSED(addressString));
             return false;
         }
-        else if (_maxOpenConnections > 0 && _connectionCount.get() >= _maxOpenConnections)
+
+        final long maxOpenConnections = getMaxOpenConnections();
+        if (maxOpenConnections > 0)
         {
-            _container.getEventLogger().message(new PortLogSubject(this),
-                                                PortMessages.CONNECTION_REJECTED_TOO_MANY(addressString,
-                                                                                       _maxOpenConnections));
-            return false;
+            final long currentCount = _connectionCount.getAndUpdate(count -> count < maxOpenConnections ? count + 1L : count) + 1L;

Review comment:
       The implemented change does not belong to the scope of this PR. IMHO, it is a separate change. if there is an issue with a counter it has to be changed in its own JIRA. 
   
   The current change consists of  atomic and non atomic operation. That is an always a scenario for a disaster in a multi-threaded environment.
   
   To be fair I would rather have the code changed to something like the one below 
   
       boolean limitReached = false
       for (;;) {
               long current = _connectionCount.get();
               limitReached = current >= maxOpenConnections; 
               long newValue = limitReached ?  count : count + 1L
               if (_connectionCount.compareAndSet(current, newValue))
                   break;
       }
   
   Though, I am not sure whether even the code above is correct. The original implementation expected that the limit is increased on connection open and decreased on connection close. Thus, we need to check that the close functionality properly decrements the limit.
   
   It could be even safer to go with something like below
   
       long  currentCount = _connectionCount.incrementAndGet();
       if (currentCount > maxOpenConnections)
       {
           _container.getEventLogger().message(....);
           _connectionCount.decreementAndGet();
          return false;
      }
   
   I would really prefer to create a separate PR for the change
   

##########
File path: broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiter.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security.limit;
+
+import java.util.Optional;
+
+import org.apache.qpid.server.security.limit.ConnectionSlot.FreeSlot;
+import org.apache.qpid.server.transport.AMQPConnection;
+
+public interface ConnectionLimiter
+{
+    ConnectionSlot register(AMQPConnection<?> connection);
+
+    ConnectionLimiter append(ConnectionLimiter limiter);
+
+    static CachedLimiter noLimits()
+    {
+        return NoLimits.INSTANCE;
+    }
+
+    static CachedLimiter blockEveryone()
+    {
+        return BlockEveryone.INSTANCE;
+    }
+
+    static CachedLimiter cacheConnectionRegistration(ConnectionLimiter limiter) {
+        if (limiter instanceof CachedLimiter) {
+            return (CachedLimiter) limiter;
+        }
+        return new CachedConnectionLimiterImpl(limiter);
+    }
+
+    interface CachedLimiter extends ConnectionLimiter
+    {
+        ConnectionLimiter underlyingLimiter();
+
+        @Override
+        default ConnectionLimiter append(ConnectionLimiter limiter) {
+            return underlyingLimiter().append(limiter);
+        }
+    }
+
+    final class NoLimits implements CachedLimiter
+    {
+        static CachedLimiter INSTANCE = new NoLimits();
+
+        private NoLimits()
+        {
+            super();
+        }
+
+        @Override
+        public ConnectionSlot register(AMQPConnection<?> connection)
+        {
+            return FreeSlot.INSTANCE;
+        }
+
+        @Override
+        public ConnectionLimiter append(ConnectionLimiter limiter)
+        {
+            return Optional.ofNullable(limiter).orElse(this);

Review comment:
       I would like to  understand what are the reasons behind such implementation?
   The implemented behaviour looks curious. The output depends on the input... Is it correct?

##########
File path: broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
##########
@@ -48,8 +49,13 @@
 
     boolean registerConnection(AMQPConnection<?> connection,
                                final ConnectionEstablishmentPolicy connectionEstablishmentPolicy);
+
     void deregisterConnection(AMQPConnection<?> connection);
 
+    default ConnectionSlot requestConnectionSlot(AMQPConnection<?> connection)

Review comment:
       Why this method is required?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> [Broker-J] Reimplementation of the limit number of connections per user
> -----------------------------------------------------------------------
>
>                 Key: QPID-8484
>                 URL: https://issues.apache.org/jira/browse/QPID-8484
>             Project: Qpid
>          Issue Type: Improvement
>          Components: Broker-J
>    Affects Versions: qpid-java-broker-8.0.1, qpid-java-broker-8.0.2
>            Reporter: Marek Laca
>            Priority: Major
>              Labels: connection, limit, user
>
> If some user creates too much connections, he can prevent other users from connecting to amqp ports. [QPID-8369|https://issues.apache.org/jira/projects/QPID/issues/QPID-8369] added an ability to limit maximum open connections per user.
>  The user connection limit was implemented as ACL dynamic rule and it is part of the access control logic.
> But I have queries about the implementation:
>  * The connection count of the user is not checked properly.
>  For example 2 connections should be rejected when an user has limit 5 and tries to open 7 parallel connections. But what happens:
>  ## Every connection increments the counter then the counter will be 7.
>  ## ACL logic compares the actual counter value with the limit for every connection (the counter value at the moment of the acl rule check) and 2,3 … or all 7 connections can be denied. The ACL logic does not know which connection broke the limit.
>  ## The counter is decremented when connection is closed.
>  * ACL rules were static and so the result of the check did not vary in time and the Broker could cache the result ALLOWED or DENIED. From my point of view a dynamic check should not be part of the ACL logic because it makes ACL logic time dependent.
>  * The user connection limit should be checked as soon as possible.
> I suggest to introduce a new plugin (similar to the access control provider plugin) that will hold the user's counters of open connections.
>  It will provide following functionality:
>  * It registers new connections for given user and the connection will be rejected if the user breaks the limit. The registration and update of user's counters will be an atomic operation.
>  * It de-registers the connections for given user.
> If user breaks the limit then the connection will be closed with proper response "amqp:resource-limit-exceeded" instead of "amqp:not-allowed".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org