You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/11/06 17:45:44 UTC
svn commit: r711903 - in /incubator/qpid/trunk/qpid/cpp:
examples/failover/resuming_receiver.cpp
src/qpid/client/MessageReplayTracker.cpp
src/qpid/client/MessageReplayTracker.h src/tests/MessageReplayTracker.cpp
Author: gsim
Date: Thu Nov 6 08:45:27 2008
New Revision: 711903
URL: http://svn.apache.org/viewvc?rev=711903&view=rev
Log:
* fix bug causing last message to occasionally be lost on replay
* make presence of gaps an error condition in the resuming_receiver example
* add ability to apply functor to replay buffer
Modified:
incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h
incubator/qpid/trunk/qpid/cpp/src/tests/MessageReplayTracker.cpp
Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp?rev=711903&r1=711902&r2=711903&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp Thu Nov 6 08:45:27 2008
@@ -41,14 +41,16 @@
Listener();
void received(Message& message);
void execute(AsyncSession& session, bool isRetry);
+ void check();
private:
Subscription subscription;
uint count;
uint skipped;
uint lastSn;
+ bool gaps;
};
-Listener::Listener() : count(0), skipped(0), lastSn(0) {}
+Listener::Listener() : count(0), skipped(0), lastSn(0), gaps(false) {}
void Listener::received(Message & message)
{
@@ -62,7 +64,8 @@
uint sn = message.getHeaders().getAsInt("sn");
if (lastSn < sn) {
if (sn - lastSn > 1) {
- std::cout << "Warning: gap in sequence between " << lastSn << " and " << sn << std::endl;
+ std::cout << "Error: gap in sequence between " << lastSn << " and " << sn << std::endl;
+ gaps = true;
}
lastSn = sn;
++count;
@@ -72,6 +75,11 @@
}
}
+void Listener::check()
+{
+ if (gaps) throw Exception("Detected gaps in sequence; messages appear to have been lost.");
+}
+
void Listener::execute(AsyncSession& session, bool isRetry)
{
if (isRetry) {
@@ -94,6 +102,7 @@
try {
connection.execute(listener);
connection.close();
+ listener.check();
std::cout << "Completed without error." << std::endl;
return 0;
} catch(const std::exception& error) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp?rev=711903&r1=711902&r2=711903&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp Thu Nov 6 08:45:27 2008
@@ -28,8 +28,8 @@
void MessageReplayTracker::send(const Message& message, const std::string& destination)
{
- ReplayRecord record(message, destination);
- record.send(*this);
+ buffer.push_back(ReplayRecord(message, destination));
+ buffer.back().send(*this);
if (flushInterval && ++count >= flushInterval) {
checkCompletion();
if (!buffer.empty()) session.flush();
@@ -70,7 +70,6 @@
void MessageReplayTracker::ReplayRecord::send(MessageReplayTracker& tracker)
{
status = tracker.session.messageTransfer(arg::destination=destination, arg::content=message);
- tracker.buffer.push_back(*this);
}
bool MessageReplayTracker::ReplayRecord::isComplete()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h?rev=711903&r1=711902&r2=711903&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h Thu Nov 6 08:45:27 2008
@@ -44,6 +44,13 @@
void setFlushInterval(uint interval);
uint getFlushInterval();
void checkCompletion();
+
+ template <class F> void foreach(F& f) {
+ for (std::list<ReplayRecord>::const_iterator i = buffer.begin(); i != buffer.end(); i++) {
+ f(i->message);
+ }
+ }
+
private:
struct ReplayRecord
{
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/MessageReplayTracker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/MessageReplayTracker.cpp?rev=711903&r1=711902&r2=711903&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/MessageReplayTracker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/MessageReplayTracker.cpp Thu Nov 6 08:45:27 2008
@@ -29,6 +29,23 @@
using namespace qpid::sys;
using std::string;
+class ReplayBufferChecker
+{
+ public:
+
+ ReplayBufferChecker(uint from, uint to) : end(to), i(from) {}
+
+ void operator()(const Message& m)
+ {
+ if (i > end) BOOST_FAIL("Extra message found: " + m.getData());
+ BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i++)).str(), m.getData());
+ }
+ private:
+ const uint end;
+ uint i;
+
+};
+
QPID_AUTO_TEST_CASE(testReplay)
{
ProxySessionFixture fix;
@@ -40,6 +57,9 @@
Message message((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
tracker.send(message);
}
+ ReplayBufferChecker checker(1, 10);
+ tracker.foreach(checker);
+
tracker.replay(fix.session);
for (uint j = 0; j < 2; j++) {//each message should have been sent twice
for (uint i = 0; i < 5; i++) {