You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/11/06 17:45:44 UTC

svn commit: r711903 - in /incubator/qpid/trunk/qpid/cpp: examples/failover/resuming_receiver.cpp src/qpid/client/MessageReplayTracker.cpp src/qpid/client/MessageReplayTracker.h src/tests/MessageReplayTracker.cpp

Author: gsim
Date: Thu Nov  6 08:45:27 2008
New Revision: 711903

URL: http://svn.apache.org/viewvc?rev=711903&view=rev
Log:
* fix bug causing last message to occasionally be lost on replay
* make presence of gaps an error condition in the resuming_receiver example
* add ability to apply functor to replay buffer


Modified:
    incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h
    incubator/qpid/trunk/qpid/cpp/src/tests/MessageReplayTracker.cpp

Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp?rev=711903&r1=711902&r2=711903&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp Thu Nov  6 08:45:27 2008
@@ -41,14 +41,16 @@
     Listener();
     void received(Message& message);
     void execute(AsyncSession& session, bool isRetry);
+    void check();
   private:
     Subscription subscription;
     uint count;
     uint skipped;
     uint lastSn;
+    bool gaps;
 };
 
-Listener::Listener() : count(0), skipped(0), lastSn(0) {}
+Listener::Listener() : count(0), skipped(0), lastSn(0), gaps(false) {}
 
 void Listener::received(Message & message) 
 {
@@ -62,7 +64,8 @@
         uint sn = message.getHeaders().getAsInt("sn");
         if (lastSn < sn) {
             if (sn - lastSn > 1) {
-                std::cout << "Warning: gap in sequence between " << lastSn << " and " << sn << std::endl;
+                std::cout << "Error: gap in sequence between " << lastSn << " and " << sn << std::endl;
+                gaps = true;
             }
             lastSn = sn;
             ++count;
@@ -72,6 +75,11 @@
     }
 }
 
+void Listener::check()
+{
+    if (gaps) throw Exception("Detected gaps in sequence; messages appear to have been lost.");
+}
+
 void Listener::execute(AsyncSession& session, bool isRetry)
 {
     if (isRetry) {
@@ -94,6 +102,7 @@
     try {
         connection.execute(listener);
         connection.close();
+        listener.check();
         std::cout << "Completed without error." << std::endl;
         return 0;
     } catch(const std::exception& error) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp?rev=711903&r1=711902&r2=711903&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp Thu Nov  6 08:45:27 2008
@@ -28,8 +28,8 @@
 
 void MessageReplayTracker::send(const Message& message, const std::string& destination)
 {
-    ReplayRecord record(message, destination);
-    record.send(*this);
+    buffer.push_back(ReplayRecord(message, destination));    
+    buffer.back().send(*this);
     if (flushInterval && ++count >= flushInterval) {
         checkCompletion();
         if (!buffer.empty()) session.flush();
@@ -70,7 +70,6 @@
 void MessageReplayTracker::ReplayRecord::send(MessageReplayTracker& tracker)
 {
     status = tracker.session.messageTransfer(arg::destination=destination, arg::content=message);
-    tracker.buffer.push_back(*this);    
 }
 
 bool MessageReplayTracker::ReplayRecord::isComplete()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h?rev=711903&r1=711902&r2=711903&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h Thu Nov  6 08:45:27 2008
@@ -44,6 +44,13 @@
     void setFlushInterval(uint interval);
     uint getFlushInterval();
     void checkCompletion();
+
+    template <class F> void foreach(F& f) {
+        for (std::list<ReplayRecord>::const_iterator i = buffer.begin(); i != buffer.end(); i++) {
+            f(i->message);
+        }
+    }
+
   private:
     struct ReplayRecord
     {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/MessageReplayTracker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/MessageReplayTracker.cpp?rev=711903&r1=711902&r2=711903&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/MessageReplayTracker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/MessageReplayTracker.cpp Thu Nov  6 08:45:27 2008
@@ -29,6 +29,23 @@
 using namespace qpid::sys;
 using std::string;
 
+class ReplayBufferChecker
+{
+  public:
+
+    ReplayBufferChecker(uint from, uint to) : end(to), i(from) {}
+
+    void operator()(const Message& m)
+    {
+        if (i > end) BOOST_FAIL("Extra message found: " + m.getData());
+        BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i++)).str(), m.getData());
+    }
+  private:
+    const uint end;
+    uint i;
+
+};
+
 QPID_AUTO_TEST_CASE(testReplay)
 {
     ProxySessionFixture fix;
@@ -40,6 +57,9 @@
         Message message((boost::format("Message_%1%") % (i+1)).str(), "my-queue");        
         tracker.send(message);
     }
+    ReplayBufferChecker checker(1, 10);
+    tracker.foreach(checker);
+
     tracker.replay(fix.session);
     for (uint j = 0; j < 2; j++) {//each message should have been sent twice
         for (uint i = 0; i < 5; i++) {