You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/02/23 14:15:45 UTC

svn commit: r1731852 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/transport/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ broke...

Author: rgodfrey
Date: Tue Feb 23 13:15:45 2016
New Revision: 1731852

URL: http://svn.apache.org/viewvc?rev=1731852&view=rev
Log:
QPID-6962 : Add ability to impose max message size on a per vhost basis

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1731852&r1=1731851&r2=1731852&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Tue Feb 23 13:15:45 2016
@@ -77,6 +77,11 @@ public interface VirtualHost<X extends V
     @ManagedContextDefault( name = VIRTUALHOST_WORK_DIR_VAR)
     public static final String VIRTUALHOST_WORK_DIR = VIRTUALHOST_WORK_DIR_VAR_EXPRESSION;
 
+    String MAX_MESSAGE_SIZE = "qpid.max_message_size";
+
+    @ManagedContextDefault(name = MAX_MESSAGE_SIZE)
+    int DEFAULT_MAX_MESSAGE_SIZE = 0x1f40000; // 500Mb
+
     @ManagedContextDefault( name = "queue.deadLetterQueueEnabled")
     public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false;
     String DEFAULT_DLE_NAME_SUFFIX = "_DLE";

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1731852&r1=1731851&r2=1731852&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Tue Feb 23 13:15:45 2016
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.auth.Subject;
@@ -107,6 +108,9 @@ public abstract class AbstractAMQPConnec
 
     private volatile boolean _messageAuthorizationRequired;
 
+    private final AtomicLong _maxMessageSize = new AtomicLong(Long.MAX_VALUE);
+
+
     public AbstractAMQPConnection(Broker<?> broker,
                                   ServerNetworkConnection network,
                                   AmqpPort<?> port,
@@ -299,6 +303,41 @@ public abstract class AbstractAMQPConnec
         return _clientProduct;
     }
 
+    protected void updateMaxMessageSize()
+    {
+        long maxMessageSize;
+        try
+        {
+            maxMessageSize = getPort().getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE);
+        }
+        catch (NullPointerException | IllegalArgumentException e)
+        {
+            _logger.warn("Context variable {} has invalid value and cannot be used to restrict maximum message size",
+                         AmqpPort.PORT_MAX_MESSAGE_SIZE,
+                         e);
+            maxMessageSize = Long.MAX_VALUE;
+        }
+        try
+        {
+            maxMessageSize = Math.min(maxMessageSize,
+                                      (long) getVirtualHost().getContextValue(Integer.class, VirtualHost.MAX_MESSAGE_SIZE));
+        }
+        catch (NullPointerException | IllegalArgumentException e)
+        {
+
+            _logger.warn("Context variable {} has invalid value and cannot be used to restrict maximum message size",
+                         VirtualHost.MAX_MESSAGE_SIZE,
+                         e);
+        }
+
+        _maxMessageSize.set(maxMessageSize > 0 ? maxMessageSize : Long.MAX_VALUE);
+    }
+
+    public long getMaxMessageSize()
+    {
+        return _maxMessageSize.get();
+    }
+
     public void addDeleteTask(final Action<? super C> task)
     {
         _connectionCloseTaskList.add(task);
@@ -487,6 +526,7 @@ public abstract class AbstractAMQPConnec
     final public void virtualHostAssociated()
     {
         getVirtualHost().registerConnection(this);
+        updateMaxMessageSize();
         _messageAuthorizationRequired = getVirtualHost().getContextValue(Boolean.class, Broker.BROKER_MSG_AUTH);
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1731852&r1=1731851&r2=1731852&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Tue Feb 23 13:15:45 2016
@@ -82,7 +82,6 @@ public class ServerConnection extends Co
             new ConcurrentLinkedQueue<>();
 
     private int _messageCompressionThreshold;
-    private final int _maxMessageSize;
 
     private AMQPConnection_0_10 _amqpConnection;
     private boolean _ignoreFutureInput;
@@ -99,8 +98,6 @@ public class ServerConnection extends Co
         _port = port;
         _transport = transport;
 
-        int maxMessageSize = port.getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE);
-        _maxMessageSize = (maxMessageSize > 0) ? maxMessageSize : Integer.MAX_VALUE;
 
     }
 
@@ -520,7 +517,7 @@ public class ServerConnection extends Co
 
     public int getMaxMessageSize()
     {
-        return _maxMessageSize;
+        return (int)Math.min(_amqpConnection.getMaxMessageSize(), (long)Integer.MAX_VALUE);
     }
 
     public void transportStateChanged()

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1731852&r1=1731851&r2=1731852&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Tue Feb 23 13:15:45 2016
@@ -78,7 +78,7 @@ public class ServerSessionTest extends Q
         final Broker<?> broker = mock(Broker.class);
         when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
 
-        AmqpPort amqpPort = createMockPort(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
+        AmqpPort amqpPort = createMockPort();
 
         ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP);
         final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class);
@@ -105,13 +105,13 @@ public class ServerSessionTest extends Q
         final Broker<?> broker = mock(Broker.class);
         when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
 
-        AmqpPort port = createMockPort(1024);
+        AmqpPort port = createMockPort();
 
         ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP);
         final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class);
         Subject subject = new Subject();
         when(protocolEngine.getSubject()).thenReturn(subject);
-
+        when(protocolEngine.getMaxMessageSize()).thenReturn(1024l);
         connection.setAmqpConnection(protocolEngine);
         connection.setVirtualHost(_virtualHost);
         final List<Method> invokedMethods = new ArrayList<>();
@@ -146,10 +146,9 @@ public class ServerSessionTest extends Q
         assertTrue("Methods invoked when not expecting any", invokedMethods.isEmpty());
     }
 
-    public AmqpPort createMockPort(int maxMessageSize)
+    public AmqpPort createMockPort()
     {
         AmqpPort port = mock(AmqpPort.class);
-        when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(maxMessageSize);
         TaskExecutor childExecutor = new TaskExecutorImpl();
         childExecutor.start();
         when(port.getChildExecutor()).thenReturn(childExecutor);

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1731852&r1=1731851&r2=1731852&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Tue Feb 23 13:15:45 2016
@@ -42,6 +42,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
@@ -170,7 +171,6 @@ public class AMQPConnection_0_8
     private volatile int _currentClassId;
     private volatile int _currentMethodId;
     private final int _binaryDataLimit;
-    private final long _maxMessageSize;
     private volatile boolean _transportBlockedForWriting;
 
     public AMQPConnection_0_8(Broker<?> broker,
@@ -193,9 +193,6 @@ public class AMQPConnection_0_8
                 ? getBroker().getContextValue(String.class, Broker.SEND_QUEUE_DELETE_OK_REGARDLESS_CLIENT_VER_REGEXP): "";
         _sendQueueDeleteOkRegardlessClientVerRegexp = Pattern.compile(sendQueueDeleteOkRegardlessRegexp);
 
-        int maxMessageSize = port.getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE);
-        _maxMessageSize = (maxMessageSize > 0) ? (long) maxMessageSize : Long.MAX_VALUE;
-
         _network = network;
         _sender = network.getSender();
         _closeWhenNoRoute = getBroker().getConnection_closeWhenNoRoute();
@@ -729,6 +726,7 @@ public class AMQPConnection_0_8
         {
             _messageCompressionThreshold = Integer.MAX_VALUE;
         }
+
         getSubject().getPrincipals().add(virtualHost.getPrincipal());
 
         updateAccessControllerContext();
@@ -1345,11 +1343,6 @@ public class AMQPConnection_0_8
         return _binaryDataLimit;
     }
 
-    public long getMaxMessageSize()
-    {
-        return _maxMessageSize;
-    }
-
     public final class WriteDeliverMethod
             implements ClientDeliveryMethod
     {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org