You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/09/05 13:48:24 UTC

svn commit: r1759269 - in /qpid/java/trunk: doc/java-broker/src/docbkx/Java-Broker-Runtime.xml doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Consumers.xml systests/src/test/java/org/apache/qpid/server/queue/ConsumerPriorityTest.java

Author: rgodfrey
Date: Mon Sep  5 13:48:23 2016
New Revision: 1759269

URL: http://svn.apache.org/viewvc?rev=1759269&view=rev
Log:
QPID-7381 : Add documentation and a test for ADDR declaration

Added:
    qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Consumers.xml
      - copied, changed from r1758673, qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml
Modified:
    qpid/java/trunk/doc/java-broker/src/docbkx/Java-Broker-Runtime.xml
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ConsumerPriorityTest.java

Modified: qpid/java/trunk/doc/java-broker/src/docbkx/Java-Broker-Runtime.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/doc/java-broker/src/docbkx/Java-Broker-Runtime.xml?rev=1759269&r1=1759268&r2=1759269&view=diff
==============================================================================
--- qpid/java/trunk/doc/java-broker/src/docbkx/Java-Broker-Runtime.xml (original)
+++ qpid/java/trunk/doc/java-broker/src/docbkx/Java-Broker-Runtime.xml Mon Sep  5 13:48:23 2016
@@ -28,6 +28,7 @@
   <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="runtime/Java-Broker-Runtime-Handling-Undeliverable-Messages.xml"/>
   <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="runtime/Java-Broker-Runtime-Close-On-No-Route.xml"/>
   <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="runtime/Java-Broker-Runtime-Flow-To-Disk.xml"/>
+  <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="runtime/Java-Broker-Runtime-Consumers.xml"/>
   <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="runtime/Java-Broker-Runtime-Background-Recovery.xml"/>
   <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="runtime/Java-Broker-Runtime-Message-Compression.xml"/>
   <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="runtime/Java-Broker-Runtime-Connection-Limit.xml"/>

Copied: qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Consumers.xml (from r1758673, qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Consumers.xml?p2=qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Consumers.xml&p1=qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml&r1=1758673&r2=1759269&rev=1759269&view=diff
==============================================================================
--- qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Flow-To-Disk.xml (original)
+++ qpid/java/trunk/doc/java-broker/src/docbkx/runtime/Java-Broker-Runtime-Consumers.xml Mon Sep  5 13:48:23 2016
@@ -21,25 +21,71 @@
 -->
 
 <section xmlns="http://docbook.org/ns/docbook" version="5.0" xml:id="Java-Broker-Runtime-Flow-To-Disk">
-  <title>Flow to Disk</title>
-  <para>Flow to disk limits the amount of <link linkend="Java-Broker-Runtime-Memory">direct and heap memory</link>
-    that can be occupied by messages. Once this
-    limit is reached any new transient messages and all existing transient messages will be
-    transferred to disk. Newly arriving transient messages will continue to go to the disk until the
-    cumulative size of all messages falls below the limit once again.</para>
-  <para>By default the Broker makes 40% of the max direct available memory for messages. This memory is
-    divided between all the queues across all virtual hosts defined on the Broker with a percentage
-    calculated according to their current queue size. These calculations are refreshed periodically
-    by the housekeeping cycle.</para>
-  <para>For example if there are two queues, one containing 75MB and the second 100MB messages
-    respectively and the Broker has 1GB direct memory with the default of 40% available for messages.
-    The first queue will have a target size of 170MB and the second 230MB. Once 400MB is taken by
-    messages, messages will begin to flow to disk. New messages will cease to flow to disk when
-    their cumulative size falls beneath 400MB.</para>
-  <para>Flow to disk is configured by Broker context variable
-      <literal>broker.flowToDiskThreshold</literal>. It is expressed as a size in bytes and defaults
-    to 40% of the JVM maximum heap size.</para>
-  <para>Log message <link linkend="Java-Broker-Appendix-Operation-Logging-Message-BRK-1014">BRK-1014</link> is written when the feature activates. Once the total space of all messages
-    decreases below the threshold, the message <link linkend="Java-Broker-Appendix-Operation-Logging-Message-BRK-1015">BRK-1015</link> is written
-    to show that the feature is no longer active.</para>
+  <title>Consumers</title>
+
+  <para>A Consumer is created when an AMQP connection wishes to receive messages from a message source (such as a
+    Queue).  The standard behaviours of consumers are defined by the respective AMQP specification, however in addition
+    to the standard behaviours a number of Qpid specific enhancements are available</para>
+
+  <section xml:id="Java-Broker-Runtime-Consumers-Prioirty">
+    <title>Priority</title>
+    <para>By default, when there are multiple competing consumers attached to the same message source, the Broker
+      attempts to distribute messages from the queue in a "fair" manner. Some use cases require allocation of messages
+      to consumers to be based on the "priority" of the consumer. Where there are multiple consumers having differing
+      priorities, the Broker will always attempt to deliver a message to a higher priority consumer before attempting
+      delivery to a lower priority consumer. That is, a lower priority consumer will only receive a message if no
+      higher priority consumers currently have credit available to consume the message, or those consumers have declined
+      to accept the message (for instance because it does not meet the criteria of any selectors associated with the
+      consumer).</para>
+    <para>Where a consumer is created with no explicit priority provided, the consumer is given the highest possible
+      priority.</para>
+    <section>
+      <title>Creating a Consumer with a non-standard priority</title>
+      <para>
+        In AMQP 0-9 and 0-9-1 the priority of the consumer can be set by adding an entry into the table provided as the
+        <literal>arguments</literal> field (known as the <literal>filter</literal> field on AMQP 0-9) of the
+        <literal>basic.consume</literal> method. The key for the entry must be the literal short string
+        <literal>x-priority</literal>, and the value of the entry must be an integral number in the range
+        -2<superscript>31</superscript> to 2<superscript>31</superscript>-1.
+      </para>
+      <para>
+        In AMQP 0-10 the priority of the consumer can be set in the map provided as the <literal>arguments</literal>
+        field of the <literal>message.subscribe</literal> method. The key for the entry must be the literal string
+        <literal>x-priority</literal>, and the value of the entry must be an integral number in the range
+        -2<superscript>31</superscript> to 2<superscript>31</superscript>-1.
+      </para>
+      <para>
+        In AMQP 1.0 the priority of the consumer is set in the <literal>properties</literal> map of the
+        <literal>attach</literal> frame where the broker side of the link represents the sending side of the link.
+        The key for the entry must be the literal string <literal>priority</literal>, and the value of the entry must
+        be an integral number in the range -2<superscript>31</superscript> to 2<superscript>31</superscript>-1.
+      </para>
+      <para>
+        When using the Qpid JMS client for AMQP 0-9/0-9-1/0-10 the consumer priority can be set in the address being
+        used for the Destination object.
+
+        <table>
+          <title>Setting the consumer priority</title>
+          <tgroup cols="2">
+            <thead>
+              <row>
+                <entry>Syntax</entry>
+                <entry>Example</entry>
+              </row>
+            </thead>
+            <tbody>
+              <row>
+                <entry>Addressing</entry>
+                <entry>myqueue : { link : { x-subscribe: { arguments : { x-priority : '10' } } } }</entry>
+              </row>
+              <row>
+                <entry>Binding URL</entry>
+                <entry>direct://amq.direct/myqueue/myqueue?x-qpid-replay-priority='10'</entry>
+              </row>
+            </tbody>
+          </tgroup>
+        </table>
+      </para>
+    </section>
+  </section>
 </section>

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ConsumerPriorityTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ConsumerPriorityTest.java?rev=1759269&r1=1759268&r2=1759269&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ConsumerPriorityTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ConsumerPriorityTest.java Mon Sep  5 13:48:23 2016
@@ -74,6 +74,17 @@ public class ConsumerPriorityTest extend
     public void testLowPriorityConsumerDoesNotReceiveMessagesIfHigherPriorityConsumerAvailable() throws Exception
     {
         Queue queue = _consumingSession.createQueue("direct://amq.direct/" + getTestQueueName() + "/" + getTestQueueName() + "?x-priority='10'");
+        doTestLowPriorityConsumerDoesNotReceiveMessagesIfHigherPriorityAvailable(queue);
+    }
+
+    public void testLowPriorityConsumerDoesNotReceiveMessagesIfHigherPriorityConsumerAvailableUsingADDR() throws Exception
+    {
+        Queue queue = _consumingSession.createQueue("ADDR:" + getTestQueueName() + "; { create: always, node: { type: queue }, link : { x-subscribe: { arguments : { x-priority : '10' } } } }");
+        doTestLowPriorityConsumerDoesNotReceiveMessagesIfHigherPriorityAvailable(queue);
+    }
+
+    private void doTestLowPriorityConsumerDoesNotReceiveMessagesIfHigherPriorityAvailable(final Queue queue) throws Exception
+    {
         final MessageConsumer consumer = _consumingSession.createConsumer(queue);
         assertNull("There should be no messages in the queue", consumer.receive(100L));
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org