You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/02/18 22:54:05 UTC

svn commit: r628875 - in /incubator/qpid/trunk/qpid: bin/ cpp/examples/examples/fanout/ cpp/examples/examples/pub-sub/ cpp/src/qpid/client/

Author: aconway
Date: Mon Feb 18 13:54:02 2008
New Revision: 628875

URL: http://svn.apache.org/viewvc?rev=628875&view=rev
Log:
Fixed race condition in the examples: when a listener program prints
its "ready" message, the commands it has sent to the broker may not yet
be complete. This results in sporadic lost messages if the producer is
started immediately (e.g. by a script.)

 - Added Session::sync(), wait till all commands to date have completed.
 - Call sync() before printing "ready" in listener example programs
 - Removed sleep from verify script

Modified:
    incubator/qpid/trunk/qpid/bin/verify
    incubator/qpid/trunk/qpid/cpp/examples/examples/fanout/listener.cpp
    incubator/qpid/trunk/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h

Modified: incubator/qpid/trunk/qpid/bin/verify
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/bin/verify?rev=628875&r1=628874&r2=628875&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/bin/verify (original)
+++ incubator/qpid/trunk/qpid/bin/verify Mon Feb 18 13:54:02 2008
@@ -38,8 +38,6 @@
     out=`outfile $*`
     eval "$* $ARGS > $out &" || { fail; return 1; }
     waitfor $out "$pattern"
-    # printing the ready message doesn't guarnatee we're ready, so sleep a bit.
-    sleep 1
 }
 
 name() {
@@ -81,7 +79,7 @@
 fi
 
 for example in "$@"; do
-    echo -n "== $example "
+    echo "== $example "
     if ( verify $example; ) then echo "PASS";  else echo "FAIL"; RET=1; fi
      done
 exit $RET

Modified: incubator/qpid/trunk/qpid/cpp/examples/examples/fanout/listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/examples/fanout/listener.cpp?rev=628875&r1=628874&r2=628875&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/examples/fanout/listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/examples/fanout/listener.cpp Mon Feb 18 13:54:02 2008
@@ -50,12 +50,12 @@
 {}
 
 void Listener::received(Message& message) {
-  std::cout << "Message: " << message.getData() << std::endl;
-  if (message.getData() == "That's all, folks!") {
-      std::cout << "Shutting down listener for " << message.getDestination()
-                << std::endl;
-      subscriptions.cancel(message.getDestination());
-  }
+    std::cout << "Message: " << message.getData() << std::endl;
+    if (message.getData() == "That's all, folks!") {
+        std::cout << "Shutting down listener for " << message.getDestination()
+                  << std::endl;
+        subscriptions.cancel(message.getDestination());
+    }
 }
 
 int main(int argc, char** argv) {
@@ -64,35 +64,38 @@
     Connection connection;
     Message msg;
     try {
-      connection.open(host, port);
-      Session session =  connection.newSession();
+        connection.open(host, port);
+        Session session =  connection.newSession();
 
-  //--------- Main body of program --------------------------------------------
+        //--------- Main body of program --------------------------------------------
 
-      // Unique name for private queue:
-      std::string myQueue=session.getId().str();
-      // Declear my queue. 
-      session.queueDeclare(arg::queue=myQueue, arg::exclusive=true,
-                           arg::autoDelete=true);
-      // Bind my queue to the fanout exchange.
-      // Note no routingKey required, the fanout exchange delivers
-      // all messages to all bound queues unconditionally.
-      session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue);
-
-      // Create a listener and subscribe it to my queue.
-      SubscriptionManager subscriptions(session);
-      Listener listener(subscriptions);
-      subscriptions.subscribe(listener, myQueue);
-
-      // Deliver messages until the subscription is cancelled
-      // by Listener::received()
-      std::cout << "Listening" << std::endl;
-      subscriptions.run();
+        // Unique name for private queue:
+        std::string myQueue=session.getId().str();
+        // Declear my queue. 
+        session.queueDeclare(arg::queue=myQueue, arg::exclusive=true,
+                             arg::autoDelete=true);
+        // Bind my queue to the fanout exchange.
+        // Note no routingKey required, the fanout exchange delivers
+        // all messages to all bound queues unconditionally.
+        session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue);
+
+        // Create a listener and subscribe it to my queue.
+        SubscriptionManager subscriptions(session);
+        Listener listener(subscriptions);
+        subscriptions.subscribe(listener, myQueue);
+
+        // Wait for the broker to indicate that our queues have been created.
+        session.sync();
+
+        // Deliver messages until the subscription is cancelled
+        // by Listener::received()
+        std::cout << "Listening" << std::endl;
+        subscriptions.run();
 
-  //---------------------------------------------------------------------------
+        //---------------------------------------------------------------------------
 
-      connection.close();
-      return 0;
+        connection.close();
+        return 0;
     } catch(const std::exception& error) {
         std::cout << error.what() << std::endl;
     }

Modified: incubator/qpid/trunk/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp?rev=628875&r1=628874&r2=628875&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp Mon Feb 18 13:54:02 2008
@@ -128,7 +128,8 @@
 }
 
 void Listener::listen() {
-  subscriptions.run();
+    // Receive messages
+    subscriptions.run();
 }
 
 int main(int argc, char** argv) {
@@ -151,6 +152,9 @@
 	listener.prepareQueue("europe", "europe.#");
 	listener.prepareQueue("news", "#.news");
 	listener.prepareQueue("weather", "#.weather");
+
+        // Wait for the broker to indicate that our queues have been created.
+        session.sync();
 
 	std::cout << "Listening for messages ..." << std::endl;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h?rev=628875&r1=628874&r2=628875&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h Mon Feb 18 13:54:02 2008
@@ -41,6 +41,8 @@
     virtual bool isComplete(const framing::SequenceNumber& id) = 0;
     virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0;
     virtual void setCompletionListener(boost::function<void()>) = 0;
+    virtual void syncWait(const framing::SequenceNumber& id) = 0;
+    virtual framing::SequenceNumber lastSent() const = 0;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=628875&r1=628874&r2=628875&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Mon Feb 18 13:54:02 2008
@@ -26,6 +26,8 @@
 #include "qpid/framing/AMQP_HighestVersion.h"
 #include "qpid/framing/all_method_bodies.h"
 #include "qpid/framing/ServerInvoker.h"
+#include "qpid/client/FutureCompletion.h"
+#include <boost/bind.hpp>
 
 using namespace qpid::client;
 using namespace qpid::framing;
@@ -251,4 +253,15 @@
 void ExecutionHandler::setCompletionListener(boost::function<void()> l)
 {
     completionListener = l;
+}
+
+
+void ExecutionHandler::syncWait(const SequenceNumber& id) {
+    syncTo(id);
+    FutureCompletion fc;
+    completion.listenForCompletion(
+        id, boost::bind(&FutureCompletion::completed, &fc)
+    );
+    fc.waitForCompletion();
+    assert(isCompleteUpTo(id));
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=628875&r1=628874&r2=628875&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Mon Feb 18 13:54:02 2008
@@ -86,6 +86,7 @@
     void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
     void syncTo(const framing::SequenceNumber& point);
     void flushTo(const framing::SequenceNumber& point);
+    void syncWait(const framing::SequenceNumber& id);
 
     bool isComplete(const framing::SequenceNumber& id);
     bool isCompleteUpTo(const framing::SequenceNumber& id);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp?rev=628875&r1=628874&r2=628875&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp Mon Feb 18 13:54:02 2008
@@ -34,4 +34,11 @@
 Execution& SessionBase::getExecution() { return impl->getExecution(); }
 Uuid SessionBase::getId() const { return impl->getId(); }
 framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
+
+void SessionBase::sync() {
+    Execution& ex = getExecution();
+    ex.syncWait(ex.lastSent());
+    impl->assertOpen();
+}
+
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h?rev=628875&r1=628874&r2=628875&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h Mon Feb 18 13:54:02 2008
@@ -84,6 +84,11 @@
     
     /** Close the session */
     void close();
+
+    /** Synchronize with the broker. Wait for all commands issued so far in
+     * the session to complete.
+     */
+    void sync();
     
     Execution& getExecution();