You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/02/23 14:15:45 UTC
svn commit: r1731852 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
broke...
Author: rgodfrey
Date: Tue Feb 23 13:15:45 2016
New Revision: 1731852
URL: http://svn.apache.org/viewvc?rev=1731852&view=rev
Log:
QPID-6962 : Add ability to impose max message size on a per vhost basis
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1731852&r1=1731851&r2=1731852&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Tue Feb 23 13:15:45 2016
@@ -77,6 +77,11 @@ public interface VirtualHost<X extends V
@ManagedContextDefault( name = VIRTUALHOST_WORK_DIR_VAR)
public static final String VIRTUALHOST_WORK_DIR = VIRTUALHOST_WORK_DIR_VAR_EXPRESSION;
+ String MAX_MESSAGE_SIZE = "qpid.max_message_size";
+
+ @ManagedContextDefault(name = MAX_MESSAGE_SIZE)
+ int DEFAULT_MAX_MESSAGE_SIZE = 0x1f40000; // 500Mb
+
@ManagedContextDefault( name = "queue.deadLetterQueueEnabled")
public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false;
String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1731852&r1=1731851&r2=1731852&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Tue Feb 23 13:15:45 2016
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
@@ -107,6 +108,9 @@ public abstract class AbstractAMQPConnec
private volatile boolean _messageAuthorizationRequired;
+ private final AtomicLong _maxMessageSize = new AtomicLong(Long.MAX_VALUE);
+
+
public AbstractAMQPConnection(Broker<?> broker,
ServerNetworkConnection network,
AmqpPort<?> port,
@@ -299,6 +303,41 @@ public abstract class AbstractAMQPConnec
return _clientProduct;
}
+ protected void updateMaxMessageSize()
+ {
+ long maxMessageSize;
+ try
+ {
+ maxMessageSize = getPort().getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE);
+ }
+ catch (NullPointerException | IllegalArgumentException e)
+ {
+ _logger.warn("Context variable {} has invalid value and cannot be used to restrict maximum message size",
+ AmqpPort.PORT_MAX_MESSAGE_SIZE,
+ e);
+ maxMessageSize = Long.MAX_VALUE;
+ }
+ try
+ {
+ maxMessageSize = Math.min(maxMessageSize,
+ (long) getVirtualHost().getContextValue(Integer.class, VirtualHost.MAX_MESSAGE_SIZE));
+ }
+ catch (NullPointerException | IllegalArgumentException e)
+ {
+
+ _logger.warn("Context variable {} has invalid value and cannot be used to restrict maximum message size",
+ VirtualHost.MAX_MESSAGE_SIZE,
+ e);
+ }
+
+ _maxMessageSize.set(maxMessageSize > 0 ? maxMessageSize : Long.MAX_VALUE);
+ }
+
+ public long getMaxMessageSize()
+ {
+ return _maxMessageSize.get();
+ }
+
public void addDeleteTask(final Action<? super C> task)
{
_connectionCloseTaskList.add(task);
@@ -487,6 +526,7 @@ public abstract class AbstractAMQPConnec
final public void virtualHostAssociated()
{
getVirtualHost().registerConnection(this);
+ updateMaxMessageSize();
_messageAuthorizationRequired = getVirtualHost().getContextValue(Boolean.class, Broker.BROKER_MSG_AUTH);
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1731852&r1=1731851&r2=1731852&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Tue Feb 23 13:15:45 2016
@@ -82,7 +82,6 @@ public class ServerConnection extends Co
new ConcurrentLinkedQueue<>();
private int _messageCompressionThreshold;
- private final int _maxMessageSize;
private AMQPConnection_0_10 _amqpConnection;
private boolean _ignoreFutureInput;
@@ -99,8 +98,6 @@ public class ServerConnection extends Co
_port = port;
_transport = transport;
- int maxMessageSize = port.getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE);
- _maxMessageSize = (maxMessageSize > 0) ? maxMessageSize : Integer.MAX_VALUE;
}
@@ -520,7 +517,7 @@ public class ServerConnection extends Co
public int getMaxMessageSize()
{
- return _maxMessageSize;
+ return (int)Math.min(_amqpConnection.getMaxMessageSize(), (long)Integer.MAX_VALUE);
}
public void transportStateChanged()
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1731852&r1=1731851&r2=1731852&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Tue Feb 23 13:15:45 2016
@@ -78,7 +78,7 @@ public class ServerSessionTest extends Q
final Broker<?> broker = mock(Broker.class);
when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
- AmqpPort amqpPort = createMockPort(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
+ AmqpPort amqpPort = createMockPort();
ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP);
final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class);
@@ -105,13 +105,13 @@ public class ServerSessionTest extends Q
final Broker<?> broker = mock(Broker.class);
when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
- AmqpPort port = createMockPort(1024);
+ AmqpPort port = createMockPort();
ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP);
final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class);
Subject subject = new Subject();
when(protocolEngine.getSubject()).thenReturn(subject);
-
+ when(protocolEngine.getMaxMessageSize()).thenReturn(1024l);
connection.setAmqpConnection(protocolEngine);
connection.setVirtualHost(_virtualHost);
final List<Method> invokedMethods = new ArrayList<>();
@@ -146,10 +146,9 @@ public class ServerSessionTest extends Q
assertTrue("Methods invoked when not expecting any", invokedMethods.isEmpty());
}
- public AmqpPort createMockPort(int maxMessageSize)
+ public AmqpPort createMockPort()
{
AmqpPort port = mock(AmqpPort.class);
- when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(maxMessageSize);
TaskExecutor childExecutor = new TaskExecutorImpl();
childExecutor.start();
when(port.getChildExecutor()).thenReturn(childExecutor);
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1731852&r1=1731851&r2=1731852&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Tue Feb 23 13:15:45 2016
@@ -42,6 +42,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@@ -170,7 +171,6 @@ public class AMQPConnection_0_8
private volatile int _currentClassId;
private volatile int _currentMethodId;
private final int _binaryDataLimit;
- private final long _maxMessageSize;
private volatile boolean _transportBlockedForWriting;
public AMQPConnection_0_8(Broker<?> broker,
@@ -193,9 +193,6 @@ public class AMQPConnection_0_8
? getBroker().getContextValue(String.class, Broker.SEND_QUEUE_DELETE_OK_REGARDLESS_CLIENT_VER_REGEXP): "";
_sendQueueDeleteOkRegardlessClientVerRegexp = Pattern.compile(sendQueueDeleteOkRegardlessRegexp);
- int maxMessageSize = port.getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE);
- _maxMessageSize = (maxMessageSize > 0) ? (long) maxMessageSize : Long.MAX_VALUE;
-
_network = network;
_sender = network.getSender();
_closeWhenNoRoute = getBroker().getConnection_closeWhenNoRoute();
@@ -729,6 +726,7 @@ public class AMQPConnection_0_8
{
_messageCompressionThreshold = Integer.MAX_VALUE;
}
+
getSubject().getPrincipals().add(virtualHost.getPrincipal());
updateAccessControllerContext();
@@ -1345,11 +1343,6 @@ public class AMQPConnection_0_8
return _binaryDataLimit;
}
- public long getMaxMessageSize()
- {
- return _maxMessageSize;
- }
-
public final class WriteDeliverMethod
implements ClientDeliveryMethod
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org