You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/09/05 13:48:24 UTC
svn commit: r1759269 - in /qpid/java/trunk:
doc/java-broker/src/docbkx/Java-Broker-Runtime.xml
doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Consumers.xml
systests/src/test/java/org/apache/qpid/server/queue/ConsumerPriorityTest.java
Author: rgodfrey
Date: Mon Sep 5 13:48:23 2016
New Revision: 1759269
URL: http://svn.apache.org/viewvc?rev=1759269&view=rev
Log:
QPID-7381 : Add documentation and a test for ADDR declaration
Added:
qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Consumers.xml
- copied, changed from r1758673, qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml
Modified:
qpid/java/trunk/doc/java-broker/src/docbkx/Java-Broker-Runtime.xml
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ConsumerPriorityTest.java
Modified: qpid/java/trunk/doc/java-broker/src/docbkx/Java-Broker-Runtime.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/doc/java-broker/src/docbkx/Java-Broker-Runtime.xml?rev=1759269&r1=1759268&r2=1759269&view=diff
==============================================================================
--- qpid/java/trunk/doc/java-broker/src/docbkx/Java-Broker-Runtime.xml (original)
+++ qpid/java/trunk/doc/java-broker/src/docbkx/Java-Broker-Runtime.xml Mon Sep 5 13:48:23 2016
@@ -28,6 +28,7 @@
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="runtime/Java-Broker-Runtime-Handling-Undeliverable-Messages.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="runtime/Java-Broker-Runtime-Close-On-No-Route.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="runtime/Java-Broker-Runtime-Flow-To-Disk.xml"/>
+ <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="runtime/Java-Broker-Runtime-Consumers.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="runtime/Java-Broker-Runtime-Background-Recovery.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="runtime/Java-Broker-Runtime-Message-Compression.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="runtime/Java-Broker-Runtime-Connection-Limit.xml"/>
Copied: qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Consumers.xml (from r1758673, qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Consumers.xml?p2=qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Consumers.xml&p1=qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml&r1=1758673&r2=1759269&rev=1759269&view=diff
==============================================================================
--- qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml (original)
+++ qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Consumers.xml Mon Sep 5 13:48:23 2016
@@ -21,25 +21,71 @@
-->
<section xmlns="http://docbook.org/ns/docbook" version="5.0" xml:id="Java-Broker-Runtime-Flow-To-Disk">
- <title>Flow to Disk</title>
- <para>Flow to disk limits the amount of <link linkend="Java-Broker-Runtime-Memory">direct and heap memory</link>
- that can be occupied by messages. Once this
- limit is reached any new transient messages and all existing transient messages will be
- transferred to disk. Newly arriving transient messages will continue to go to the disk until the
- cumulative size of all messages falls below the limit once again.</para>
- <para>By default the Broker makes 40% of the max direct available memory for messages. This memory is
- divided between all the queues across all virtual hosts defined on the Broker with a percentage
- calculated according to their current queue size. These calculations are refreshed periodically
- by the housekeeping cycle.</para>
- <para>For example if there are two queues, one containing 75MB and the second 100MB messages
- respectively and the Broker has 1GB direct memory with the default of 40% available for messages.
- The first queue will have a target size of 170MB and the second 230MB. Once 400MB is taken by
- messages, messages will begin to flow to disk. New messages will cease to flow to disk when
- their cumulative size falls beneath 400MB.</para>
- <para>Flow to disk is configured by Broker context variable
- <literal>broker.flowToDiskThreshold</literal>. It is expressed as a size in bytes and defaults
- to 40% of the JVM maximum heap size.</para>
- <para>Log message <link linkend="Java-Broker-Appendix-Operation-Logging-Message-BRK-1014">BRK-1014</link> is written when the feature activates. Once the total space of all messages
- decreases below the threshold, the message <link linkend="Java-Broker-Appendix-Operation-Logging-Message-BRK-1015">BRK-1015</link> is written
- to show that the feature is no longer active.</para>
+ <title>Consumers</title>
+
+ <para>A Consumer is created when an AMQP connection wishes to receive messages from a message source (such as a
+ Queue). The standard behaviours of consumers are defined by the respective AMQP specification, however in addition
+ to the standard behaviours a number of Qpid specific enhancements are available</para>
+
+ <section xml:id="Java-Broker-Runtime-Consumers-Prioirty">
+ <title>Priority</title>
+ <para>By default, when there are multiple competing consumers attached to the same message source, the Broker
+ attempts to distribute messages from the queue in a "fair" manner. Some use cases require allocation of messages
+ to consumers to be based on the "priority" of the consumer. Where there are multiple consumers having differing
+ priorities, the Broker will always attempt to deliver a message to a higher priority consumer before attempting
+ delivery to a lower priority consumer. That is, a lower priority consumer will only receive a message if no
+ higher priority consumers currently have credit available to consume the message, or those consumers have declined
+ to accept the message (for instance because it does not meet the criteria of any selectors associated with the
+ consumer).</para>
+ <para>Where a consumer is created with no explicit priority provided, the consumer is given the highest possible
+ priority.</para>
+ <section>
+ <title>Creating a Consumer with a non-standard priority</title>
+ <para>
+ In AMQP 0-9 and 0-9-1 the priority of the consumer can be set by adding an entry into the table provided as the
+ <literal>arguments</literal> field (known as the <literal>filter</literal> field on AMQP 0-9) of the
+ <literal>basic.consume</literal> method. The key for the entry must be the literal short string
+ <literal>x-priority</literal>, and the value of the entry must be an integral number in the range
+ -2<superscript>31</superscript> to 2<superscript>31</superscript>-1.
+ </para>
+ <para>
+ In AMQP 0-10 the priority of the consumer can be set in the map provided as the <literal>arguments</literal>
+ field of the <literal>message.subscribe</literal> method. The key for the entry must be the literal string
+ <literal>x-priority</literal>, and the value of the entry must be an integral number in the range
+ -2<superscript>31</superscript> to 2<superscript>31</superscript>-1.
+ </para>
+ <para>
+ In AMQP 1.0 the priority of the consumer is set in the <literal>properties</literal> map of the
+ <literal>attach</literal> frame where the broker side of the link represents the sending side of the link.
+ The key for the entry must be the literal string <literal>priority</literal>, and the value of the entry must
+ be an integral number in the range -2<superscript>31</superscript> to 2<superscript>31</superscript>-1.
+ </para>
+ <para>
+ When using the Qpid JMS client for AMQP 0-9/0-9-1/0-10 the consumer priority can be set in the address being
+ used for the Destination object.
+
+ <table>
+ <title>Setting the consumer priority</title>
+ <tgroup cols="2">
+ <thead>
+ <row>
+ <entry>Syntax</entry>
+ <entry>Example</entry>
+ </row>
+ </thead>
+ <tbody>
+ <row>
+ <entry>Addressing</entry>
+ <entry>myqueue : { link : { x-subscribe: { arguments : { x-priority : '10' } } } }</entry>
+ </row>
+ <row>
+ <entry>Binding URL</entry>
+ <entry>direct://amq.direct/myqueue/myqueue?x-qpid-replay-priority='10'</entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+ </para>
+ </section>
+ </section>
</section>
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ConsumerPriorityTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ConsumerPriorityTest.java?rev=1759269&r1=1759268&r2=1759269&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ConsumerPriorityTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ConsumerPriorityTest.java Mon Sep 5 13:48:23 2016
@@ -74,6 +74,17 @@ public class ConsumerPriorityTest extend
public void testLowPriorityConsumerDoesNotReceiveMessagesIfHigherPriorityConsumerAvailable() throws Exception
{
Queue queue = _consumingSession.createQueue("direct://amq.direct/" + getTestQueueName() + "/" + getTestQueueName() + "?x-priority='10'");
+ doTestLowPriorityConsumerDoesNotReceiveMessagesIfHigherPriorityAvailable(queue);
+ }
+
+ public void testLowPriorityConsumerDoesNotReceiveMessagesIfHigherPriorityConsumerAvailableUsingADDR() throws Exception
+ {
+ Queue queue = _consumingSession.createQueue("ADDR:" + getTestQueueName() + "; { create: always, node: { type: queue }, link : { x-subscribe: { arguments : { x-priority : '10' } } } }");
+ doTestLowPriorityConsumerDoesNotReceiveMessagesIfHigherPriorityAvailable(queue);
+ }
+
+ private void doTestLowPriorityConsumerDoesNotReceiveMessagesIfHigherPriorityAvailable(final Queue queue) throws Exception
+ {
final MessageConsumer consumer = _consumingSession.createConsumer(queue);
assertNull("There should be no messages in the queue", consumer.receive(100L));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org