You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2011/12/06 23:41:40 UTC

svn commit: r1211214 - /qpid/trunk/qpid/doc/book/src/LVQ.xml

Author: tross
Date: Tue Dec  6 22:41:40 2011
New Revision: 1211214

URL: http://svn.apache.org/viewvc?rev=1211214&view=rev
Log:
QPID-3663 - Updated the LVQ section of the C++ Broker book.

Modified:
    qpid/trunk/qpid/doc/book/src/LVQ.xml

Modified: qpid/trunk/qpid/doc/book/src/LVQ.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/doc/book/src/LVQ.xml?rev=1211214&r1=1211213&r2=1211214&view=diff
==============================================================================
--- qpid/trunk/qpid/doc/book/src/LVQ.xml (original)
+++ qpid/trunk/qpid/doc/book/src/LVQ.xml Tue Dec  6 22:41:40 2011
@@ -20,343 +20,163 @@
  
 -->
 
-<section><title>
-      LVQ
-    </title>
-
-    <section role="h2" id="LVQ-UnderstandingLVQ"><title>
-            Understanding LVQ
-          </title>
-          <para>
-            Last Value Queues are useful youUser Documentation are only
-            interested in the latest value entered into a queue. LVQ
-            semantics are typically used for things like stock symbol updates
-            when all you care about is the latest value for example.
-          </para><para>
-            Qpid C++ M4 or later supports two types of LVQ semantics:
-          </para><itemizedlist>
-            <listitem><para>LVQ
-            </para></listitem>
-            <listitem><para>LVQ_NO_BROWSE
-            </para></listitem>
-          </itemizedlist>
-    <!--h2--></section>
-
-
-	  <section role="h2" id="LVQ-LVQsemantics-3A"><title>
-            LVQ semantics:
-          </title>
+<section>
+  <title>LVQ - Last Value Queue</title>
+
+  <section role="h2" id="LVQ-UnderstandingLVQ">
+    <title>Understanding LVQ</title>
+    <para>
+      A Last Value Queue is configured with the name of a message header that
+      is used as a key.  The queue behaves as a normal FIFO queue with the
+      exception that when a message is enqueued, any other message in the
+      queue with the same value in the key header is removed and discarded.
+      Thus, for any given key value, the queue holds only the most recent
+      message.
+    </para>
+    <para>
+      The following example illustrates the operation of a Last Value Queue.
+      The example shows an empty queue with no consumers and a sequence of
+      produced messages.  The numbers represent the key for each message.
+    </para>
+    <programlisting>
+           &lt;empty queue&gt;
+      1 =>
+           1
+      2 =>
+           1 2
+      3 =>
+           1 2 3
+      4 =>
+           1 2 3 4
+      2 =>
+           1 3 4 2
+      1 =>
+           3 4 2 1
+    </programlisting>
+    <para>
+      Note that the first four messages are enqueued normally in FIFO order.
+      The fifth message has key '2' and is also enqueued on the tail of the
+      queue.  However the message already in the queue with the same key is
+      discarded.
+      <note>
+        <para>
+          If the set of keys used in the messages in a LVQ is constrained, the
+          number of messages in the queue shall not exceed the number of
+          distinct keys in use.
+        </para>
+      </note>
+    </para>
+    <section role="h3" id="LVQ-UnderstandingLVQ-UseCases">
+      <title>Common Use-Cases</title>
+      <itemizedlist>
+        <listitem>
           <para>
-            LVQ uses a header for a key, if the key matches it replaces the
-            message in-place in the queue except
-            a.) if the message with the matching key has been acquired
-            b.) if the message with the matching key has been browsed
-            In these two cases the message is placed into the queue in FIFO,
-            if another message with the same key is received it will the
-            'un-accessed' message with the same key will be replaced
-          </para><para>
-            These two exceptions protect the consumer from missing the last
-            update where a consumer or browser accesses a message and an
-            update comes with the same key.
-          </para><para>
-            An example
+            LVQ with zero or one consuming subscriptions - In this case, if
+            the consumer drops momentarily or is slower than the producer(s),
+            it will only receive current information relative to the message
+            keys.
           </para>
-            <programlisting>
-[localhost tests]$ ./lvqtest --mode create_lvq
-[localhost tests]$ ./lvqtest --mode write
-Sending Data: key1=key1.0x7fffdf3f3180
-Sending Data: key2=key2.0x7fffdf3f3180
-Sending Data: key3=key3.0x7fffdf3f3180
-Sending Data: key1=key1.0x7fffdf3f3180
-Sending Data: last=last
-[localhost tests]$ ./lvqtest --mode browse
-Receiving Data:key1.0x7fffdf3f3180
-Receiving Data:key2.0x7fffdf3f3180
-Receiving Data:key3.0x7fffdf3f3180
-Receiving Data:last
-[localhost tests]$ ./lvqtest --mode write
-Sending Data: key1=key1.0x7fffe4c7fa10
-Sending Data: key2=key2.0x7fffe4c7fa10
-Sending Data: key3=key3.0x7fffe4c7fa10
-Sending Data: key1=key1.0x7fffe4c7fa10
-Sending Data: last=last
-[localhost tests]$ ./lvqtest --mode browse
-Receiving Data:key1.0x7fffe4c7fa10
-Receiving Data:key2.0x7fffe4c7fa10
-Receiving Data:key3.0x7fffe4c7fa10
-Receiving Data:last
-[localhost tests]$ ./lvqtest --mode consume
-Receiving Data:key1.0x7fffdf3f3180
-Receiving Data:key2.0x7fffdf3f3180
-Receiving Data:key3.0x7fffdf3f3180
-Receiving Data:last
-Receiving Data:key1.0x7fffe4c7fa10
-Receiving Data:key2.0x7fffe4c7fa10
-Receiving Data:key3.0x7fffe4c7fa10
-Receiving Data:last
-</programlisting>
-<!--h2--></section>
-          <section role="h2" id="LVQ-LVQNOBROWSEsemantics-3A"><title>
-            LVQ_NO_BROWSE
-            semantics:
-          </title>
+        </listitem>
+        <listitem>
           <para>
-            LVQ uses a header for a key, if the key matches it replaces the
-            message in-place in the queue except
-            a.) if the message with the matching key has been acquired
-            In these two cases the message is placed into the queue in FIFO,
-            if another message with the same key is received it will the
-            'un-accessed' message with the same key will be replaced
-          </para><para>
-            Note, in this case browsed messaged are not invalidated, so
-            updates can be missed.
-          </para><para>
-            An example
+            LVQ with zero or more browsing subscriptions - A browsing consumer
+            can subscribe to the LVQ and get an immediate dump of all of the
+            "current" messages and track updates thereafter.  Any number of
+            independent browsers can subscribe to the same LVQ with the same
+            effect.  Since messages are never consumed, they only disappear
+            when replaced with a newer message with the same key or when their
+            TTL expires.
           </para>
-            <programlisting>
-[localhost tests]$ ./lvqtest --mode create_lvq_no_browse
-[localhost tests]$ ./lvqtest --mode write
-Sending Data: key1=key1.0x7fffce5fb390
-Sending Data: key2=key2.0x7fffce5fb390
-Sending Data: key3=key3.0x7fffce5fb390
-Sending Data: key1=key1.0x7fffce5fb390
-Sending Data: last=last
-[localhost tests]$ ./lvqtest --mode write
-Sending Data: key1=key1.0x7fff346ae440
-Sending Data: key2=key2.0x7fff346ae440
-Sending Data: key3=key3.0x7fff346ae440
-Sending Data: key1=key1.0x7fff346ae440
-Sending Data: last=last
-[localhost tests]$ ./lvqtest --mode browse
-Receiving Data:key1.0x7fff346ae440
-Receiving Data:key2.0x7fff346ae440
-Receiving Data:key3.0x7fff346ae440
-Receiving Data:last
-[localhost tests]$ ./lvqtest --mode browse
-Receiving Data:key1.0x7fff346ae440
-Receiving Data:key2.0x7fff346ae440
-Receiving Data:key3.0x7fff346ae440
-Receiving Data:last
-[localhost tests]$ ./lvqtest --mode write
-Sending Data: key1=key1.0x7fff606583e0
-Sending Data: key2=key2.0x7fff606583e0
-Sending Data: key3=key3.0x7fff606583e0
-Sending Data: key1=key1.0x7fff606583e0
-Sending Data: last=last
-[localhost tests]$ ./lvqtest --mode consume
-Receiving Data:key1.0x7fff606583e0
-Receiving Data:key2.0x7fff606583e0
-Receiving Data:key3.0x7fff606583e0
-Receiving Data:last
-[localhost tests]$ 
-
-</programlisting>
-	  <!--h2--></section>
-          <section role="h2" id="LVQ-Examplesource"><title>
-            LVQ Program Example
-          </title>
-          
-            <programlisting>
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
- 
-
-#include &lt;qpid/client/AsyncSession.h&gt;
-#include &lt;qpid/client/Connection.h&gt;
-#include &lt;qpid/client/SubscriptionManager.h&gt;
-#include &lt;qpid/client/Session.h&gt;
-#include &lt;qpid/client/Message.h&gt;
-#include &lt;qpid/client/MessageListener.h&gt;
-#include &lt;qpid/client/QueueOptions.h&gt;
-
-#include &lt;iostream&gt;
-
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace qpid::sys;
-using namespace qpid;
-using namespace std;
-
-
-enum Mode { CREATE_LVQ, CREATE_LVQ_NO_BROWSE, WRITE, BROWSE, CONSUME};
-const char* modeNames[] = { "create_lvq","create_lvq_no_browse","write","browse","consume" };
-
-// istream/ostream ops so Options can read/display Mode.
-istream&amp; operator&gt;&gt;(istream&amp; in, Mode&amp; mode) {
-    string s;
-    in &gt;&gt; s;
-    int i = find(modeNames, modeNames+5, s) - modeNames;
-    if (i &gt;= 5)  throw Exception("Invalid mode: "+s);
-    mode = Mode(i);
-    return in;
-}
-
-ostream&amp; operator&lt;&lt;(ostream&amp; out, Mode mode) {
-    return out &lt;&lt; modeNames[mode];
-}
-
-struct  Args : public qpid::Options,
-               public qpid::client::ConnectionSettings
-{
-    bool help;
-    Mode mode;
-
-    Args() : qpid::Options("Simple latency test optins"), help(false), mode(BROWSE)
-    {
-        using namespace qpid;
-        addOptions()
-            ("help", optValue(help), "Print this usage statement")
-            ("broker,b", optValue(host, "HOST"), "Broker host to connect to") 
-            ("port,p", optValue(port, "PORT"), "Broker port to connect to")
-            ("username", optValue(username, "USER"), "user name for broker log in.")
-            ("password", optValue(password, "PASSWORD"), "password for broker log in.")
-            ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.")
-            ("tcp-nodelay", optValue(tcpNoDelay), "Turn on tcp-nodelay")
-            ("mode", optValue(mode, "'see below'"), "Action mode."
-             "\ncreate_lvq: create a new queue of type lvq.\n"
-             "\ncreate_lvq_no_browse: create a new queue of type lvq with no lvq on browse.\n"
-             "\nwrite: write a bunch of data &amp; keys.\n"
-             "\nbrowse: browse the queue.\n"
-             "\nconsume: consume from the queue.\n");
-    }
-};
-
-class Listener : public MessageListener
-{
-  private:
-    Session session;
-    SubscriptionManager subscriptions;
-    std::string queue;
-    Message request;
-    QueueOptions args;
-  public:
-    Listener(Session&amp; session);
-    void setup(bool browse);
-    void send(std::string kv);
-    void received(Message&amp; message);
-    void browse(); 
-    void consume(); 
-};
-
-Listener::Listener(Session&amp; s) :
-    session(s), subscriptions(s),
-    queue("LVQtester")
-{}
-
-void Listener::setup(bool browse)
-{
-    // set queue mode
-    args.setOrdering(browse?LVQ_NO_BROWSE:LVQ);
-
-    session.queueDeclare(arg::queue=queue, arg::exclusive=false, arg::autoDelete=false, arg::arguments=args);
-
-}
-
-void Listener::browse()
-{
-    subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE, ACQUIRE_MODE_NOT_ACQUIRED));    
-    subscriptions.run();
-}
-
-void Listener::consume()
-{
-    subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE, ACQUIRE_MODE_PRE_ACQUIRED));    
-    subscriptions.run();
-}
-
-void Listener::send(std::string kv)
-{
-    request.getDeliveryProperties().setRoutingKey(queue);
-
-    std::string key;
-        args.getLVQKey(key);
-    request.getHeaders().setString(key, kv);
-
-    std::ostringstream data;
-    data &lt;&lt; kv;
-    if (kv != "last") data &lt;&lt; "." &lt;&lt; hex &lt;&lt; this;
-    request.setData(data.str());
-    
-    cout &lt;&lt; "Sending Data: " &lt;&lt; kv &lt;&lt; "=" &lt;&lt; data.str() &lt;&lt; std::endl;
-    async(session).messageTransfer(arg::content=request);
-    
-}
-
-void Listener::received(Message&amp; response) 
-{
-
-    cout &lt;&lt; "Receiving Data:" &lt;&lt; response.getData() &lt;&lt; std::endl;
-/*    if (response.getData() == "last"){
-        subscriptions.cancel(queue);
-    }
-*/
-}
-
-int main(int argc, char** argv) 
-{
-    Args opts;
-    opts.parse(argc, argv);
-
-    if (opts.help) {
-        std::cout &lt;&lt; opts &lt;&lt; std::endl;
-        return 0;
-    }
-
-    Connection connection;
-    try {
-        connection.open(opts);
-        Session session = connection.newSession();
-        Listener listener(session);
-        
-        switch (opts.mode)
-        {
-        case CONSUME:
-           listener.consume();
-           break;     
-        case BROWSE:
-           listener.browse();
-           break;     
-        case CREATE_LVQ:
-           listener.setup(false);
-           break;     
-        case CREATE_LVQ_NO_BROWSE:
-           listener.setup(true);
-           break;     
-        case WRITE:
-           listener.send("key1");
-           listener.send("key2");
-           listener.send("key3");
-           listener.send("key1");
-           listener.send("last");
-           break;     
-        }
-        connection.close();
-        return 0;
-    } catch(const std::exception&amp; error) {
-        std::cout &lt;&lt; error.what() &lt;&lt; std::endl;
-    }
-    return 1;
-}
-
-</programlisting>
-<!--h2--></section>
+        </listitem>
+      </itemizedlist>
+    </section>
+  </section>
+
+  <section role="h2" id="LVQ-Creating">
+    <title>Creating a Last Value Queue</title>
+    <section role="h3" id="LVQ-Creating-Address">
+      <title>Using Addressing Syntax</title>
+      <para>
+        A LVQ may be created using directives in the API's address syntax.
+        The important argument is "qpid.last_value_queue_key".  The following
+        Python example shows how a producer of stock price updates can create
+        a LVQ to hold the latest stock prices for each ticker symbol.  The
+        message header used to hold the ticker symbol is called "ticker".
+      </para>
+      <programlisting>
+    conn = Connection(url)
+    conn.open()
+    sess = conn.session()
+    tx = sess.sender("prices;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key':'ticker'}}}}")
+      </programlisting>
+    </section>
+    <section role="h3" id="LVQ-Creating-Tool">
+      <title>Using qpid-config</title>
+      <para>
+        The same LVQ as shown in the previous example can be created using the
+        qpid-config utility:
+      </para>
+      <programlisting>
+    $ qpid-config add queue prices --argument qpid.last_value_queue_key=ticker
+      </programlisting>
+    </section>
+  </section>
+
+  <section role="h2" id="LVQ-Example">
+    <title>LVQ Example</title>
+
+    <section role="h3" id="LVQ-Example-Sender">
+      <title>LVQ Sender</title>
+      <programlisting>
+    from qpid.messaging import Connection, Message
+
+    def send(sender, key, message):
+      message.properties["key"] = key
+      sender.send(message)
+
+    conn = Connection("localhost")
+    conn.open()
+    sess = conn.session()
+    tx = sess.sender("topic;{create:always, node:{type:queue,x-declare:{arguments:{'qpid.last_value_queue_key':key}}}}")
+
+    msg = Message("Content")
+    send(tx, "key1", msg);
+    send(tx, "key2", msg);
+    send(tx, "key3", msg);
+    send(tx, "key4", msg);
+    send(tx, "key2", msg);
+    send(tx, "key1", msg);
+
+    conn.close()
+      </programlisting>
+    </section>
+
+    <section role="h3" id="LVQ-Example-Receiver">
+      <title>LVQ Browsing Receiver</title>
+      <programlisting>
+    from qpid.messaging import Connection, Message
+    from time import sleep
+
+    conn = Connection("localhost")
+    conn.open()
+    sess = conn.session()
+    rx = sess.receiver("topic;{mode:browse}")
+
+    while True:
+      msg = rx.fetch()
+      sess.acknowledge()
+      print msg
+      </programlisting>
+    </section>
+  </section>
+
+  <section role="h2" id="LVQ-Deprecated">
+    <title>Deprecated LVQ Modes</title>
+    <para>
+      There are two legacy modes (still implemented as of Qpid 0.14)
+      controlled by the qpid.last_value_queue and
+      qpid.last_value_queue_no_browse argument values.  These modes are
+      intended to be deprecated and should not be used.
+    </para>
+  </section>
 </section>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org