You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/30 22:07:29 UTC
svn commit: r709242 - in /incubator/qpid/trunk/qpid/cpp: src/qpid/broker/
src/qpid/client/ src/qpid/cluster/ src/tests/ xml/
Author: aconway
Date: Thu Oct 30 14:07:28 2008
New Revision: 709242
URL: http://svn.apache.org/viewvc?rev=709242&view=rev
Log:
Replicate session state for un-acknowledged messages to new cluster members.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Oct 30 14:07:28 2008
@@ -380,15 +380,12 @@
};
}// namespace
-bool Queue::find(QueuedMessage& msg, SequenceNumber pos) const {
+QueuedMessage Queue::find(SequenceNumber pos) const {
Mutex::ScopedLock locker(messageLock);
Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos));
- if (i == messages.end())
- return false;
- else {
- msg = *i;
- return true;
- }
+ if (i != messages.end())
+ return *i;
+ return QueuedMessage();
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
@@ -876,9 +873,6 @@
}
void Queue::setPosition(SequenceNumber n) {
- if (n <= sequence)
- throw InvalidArgumentException(QPID_MSG("Invalid position " << n << " < " << sequence
- << " for queue " << name));
+ Mutex::ScopedLock locker(messageLock);
sequence = n;
- --sequence; // Decrement so ++sequence will return n.
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Oct 30 14:07:28 2008
@@ -230,12 +230,8 @@
*/
QueuedMessage get();
- /** Get the message at position pos
- *@param msg out parameter, assigned to the message found.
- *@param pos position to search for.
- *@return True if there is a message at pos, false otherwise.
- */
- bool find(QueuedMessage& msg, framing::SequenceNumber pos) const;
+ /** Get the message at position pos */
+ QueuedMessage find(framing::SequenceNumber pos) const;
const QueuePolicy* getPolicy();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Thu Oct 30 14:07:28 2008
@@ -100,10 +100,12 @@
void readyToSend();
+ // Used by cluster to create replica sessions.
template <class F> void eachConsumer(F f) { semanticState.eachConsumer(f); }
+ template <class F> void eachUnacked(F f) { semanticState.eachUnacked(f); }
SemanticState::ConsumerImpl& getConsumer(const string& dest) { return semanticState.find(dest); }
-
boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
+ void record(const DeliveryRecord& delivery) { semanticState.record(delivery); }
private:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp Thu Oct 30 14:07:28 2008
@@ -33,12 +33,12 @@
LocalQueue::LocalQueue() {}
LocalQueue::~LocalQueue() {}
-Message LocalQueue::pop() { return get(); }
+Message LocalQueue::pop(sys::Duration timeout) { return get(timeout); }
-Message LocalQueue::get() {
+Message LocalQueue::get(sys::Duration timeout) {
Message result;
- bool ok = get(result, sys::TIME_INFINITE);
- assert(ok); (void) ok;
+ bool ok = get(result, timeout);
+ if (!ok) throw Exception("Timed out waiting for a message");
return result;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h Thu Oct 30 14:07:28 2008
@@ -56,14 +56,14 @@
*/
bool get(Message& result, sys::Duration timeout=0);
- /** Get the next message off the local queue, or wait for a
- * message from the broker queue.
- *@exception ClosedException if subscription has been closed.
+ /** Get the next message off the local queue, or wait up to the timeout
+ * for message from the broker queue.
+ *@exception ClosedException if subscription is closed or timeout exceeded.
*/
- Message get();
+ Message get(sys::Duration timeout=sys::TIME_INFINITE);
/** Synonym for get(). */
- Message pop();
+ Message pop(sys::Duration timeout=sys::TIME_INFINITE);
/** Return true if local queue is empty. */
bool empty() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Oct 30 14:07:28 2008
@@ -19,6 +19,7 @@
*
*/
#include "Connection.h"
+#include "DumpClient.h"
#include "Cluster.h"
#include "qpid/broker/SessionState.h"
@@ -73,8 +74,6 @@
output.deliverDoOutput(requested);
}
-// FIXME aconway 2008-10-15: changes here, dubious.
-
// Received from a directly connected client.
void Connection::received(framing::AMQFrame& f) {
QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
@@ -214,6 +213,50 @@
return self.first == cluster.getId() && self.second == 0;
}
+void Connection::deliveryRecord(const string& qname,
+ const SequenceNumber& position,
+ const string& tag,
+ const SequenceNumber& id,
+ bool acquired,
+ bool accepted,
+ bool cancelled,
+ bool completed,
+ bool ended,
+ bool windowing)
+{
+ broker::QueuedMessage m;
+ broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname);
+ if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname));
+ broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
+ if (!dumpQueue) throw Exception(QPID_MSG(cluster << " deliveryRecord missing dump queue"));
+
+ if (!ended) { // Has a message
+ if (acquired) // Message at front of dump queue
+ m = dumpQueue->get();
+ else // Message at original position in original queue
+ m = queue->find(position);
+ if (!m.payload)
+ throw Exception(QPID_MSG("deliveryRecord no dump message"));
+ }
+
+ broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing);
+ dr.setId(id);
+ if (cancelled) dr.cancel(dr.getTag());
+ if (completed) dr.complete();
+ if (ended) dr.setEnded(); // Exsitance of message
+
+ broker::SessionHandler& h = connection.getChannel(currentChannel);
+ broker::SessionState* s = h.getSession();
+ assert(s);
+ s->record(dr);
+}
+
+void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
+ shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
+ if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
+ q->setPosition(position);
+}
+
std::ostream& operator<<(std::ostream& o, const Connection& c) {
const char* type="unknown";
if (c.isLocal()) type = "local";
@@ -222,5 +265,7 @@
return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
}
+
+
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Oct 30 14:07:28 2008
@@ -94,17 +94,30 @@
// ==== Used in catch-up mode to build initial state.
//
// State dump methods.
- void sessionState(const SequenceNumber& replayStart,
- const SequenceNumber& sendCommandPoint,
- const SequenceSet& sentIncomplete,
- const SequenceNumber& expected,
- const SequenceNumber& received,
- const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
+ void sessionState(const framing::SequenceNumber& replayStart,
+ const framing::SequenceNumber& sendCommandPoint,
+ const framing::SequenceSet& sentIncomplete,
+ const framing::SequenceNumber& expected,
+ const framing::SequenceNumber& received,
+ const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
void shadowReady(uint64_t memberId, uint64_t connectionId);
void membership(const framing::FieldTable&, const framing::FieldTable&);
+ void deliveryRecord(const std::string& queue,
+ const framing::SequenceNumber& position,
+ const std::string& tag,
+ const framing::SequenceNumber& id,
+ bool acquired,
+ bool accepted,
+ bool cancelled,
+ bool completed,
+ bool ended,
+ bool windowing);
+
+ void queuePosition(const std::string&, const framing::SequenceNumber&);
+
private:
bool catcUp;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Thu Oct 30 14:07:28 2008
@@ -56,14 +56,11 @@
using client::SessionBase_0_10Access;
struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection {
- ClusterConnectionProxy(client::Connection& c) :
+ ClusterConnectionProxy(client::Connection c) :
AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {}
+ ClusterConnectionProxy(client::AsyncSession s) :
+ AMQP_AllProxy::ClusterConnection(SessionBase_0_10Access(s).get()->out) {}
};
-struct ClusterProxy : public AMQP_AllProxy::Cluster {
- ClusterProxy(client::Connection& c) :
- AMQP_AllProxy::Cluster(*client::ConnectionAccess::getImpl(c)) {}
-};
-
// Create a connection with special version that marks it as a catch-up connection.
client::Connection catchUpConnection() {
@@ -73,7 +70,7 @@
}
// Send a control body directly to the session.
-void send(client::Session& s, const AMQBody& body) {
+void send(client::AsyncSession& s, const AMQBody& body) {
client::SessionBase_0_10Access sb(s);
sb.get()->send(body);
}
@@ -94,19 +91,23 @@
DumpClient::~DumpClient() {}
-// Catch-up exchange name: an illegal AMQP exchange name to avoid clashes.
-static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
-static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS));
+// Illegal exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
+static const char DUMP_CHARS[] = "\000qpid-dump";
+const std::string DumpClient::DUMP(DUMP_CHARS, sizeof(DUMP_CHARS));
void DumpClient::dump() {
QPID_LOG(debug, dumperId << " dumping state to " << dumpeeId << " at " << dumpeeUrl);
Broker& b = dumperBroker;
b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
- // Catch-up exchange is used to route messages to the proper queue without modifying routing key.
- session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true);
+
+ // Dump exchange is used to route messages to the proper queue without modifying routing key.
+ session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true);
b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
+// Dump queue is used to transfer acquired messages that are no longer on their original queue.
+ session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true);
session.sync();
session.close();
+
std::for_each(connections.begin(), connections.end(), boost::bind(&DumpClient::dumpConnection, this, _1));
AMQFrame frame(map.asMethodBody());
client::ConnectionAccess::getImpl(connection)->handle(frame);
@@ -134,6 +135,39 @@
arg::arguments=ex->getArgs());
}
+/** Bind a queue to the dump exchange and dump messges to it
+ * setting the message possition as needed.
+ */
+class MessageDumper {
+ std::string queue;
+ bool haveLastPos;
+ framing::SequenceNumber lastPos;
+ client::AsyncSession session;
+
+ public:
+
+ MessageDumper(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) {
+ session.exchangeBind(queue, DumpClient::DUMP);
+ }
+
+ ~MessageDumper() {
+ session.exchangeUnbind(queue, DumpClient::DUMP);
+ }
+
+ void dump(const broker::QueuedMessage& message) {
+ if (!haveLastPos || message.position - lastPos != 1) {
+ ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
+ haveLastPos = true;
+ }
+ lastPos = message.position;
+ SessionBase_0_10Access sb(session);
+ framing::MessageTransferBody transfer(
+ framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
+ sb.get()->send(transfer, message.payload->getFrames());
+ }
+};
+
+
void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
session.queueDeclare(
q->getName(),
@@ -143,19 +177,11 @@
arg::exclusive=q->hasExclusiveConsumer(),
arg::autoDelete=q->isAutoDelete(),
arg::arguments=q->getSettings());
-
- session.exchangeBind(q->getName(), CATCH_UP, std::string());
- q->eachMessage(boost::bind(&DumpClient::dumpMessage, this, _1));
- session.exchangeUnbind(q->getName(), CATCH_UP, std::string());
+ MessageDumper dumper(q->getName(), session);
+ q->eachMessage(boost::bind(&MessageDumper::dump, &dumper, _1));
q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1));
}
-void DumpClient::dumpMessage(const broker::QueuedMessage& message) {
- SessionBase_0_10Access sb(session);
- framing::MessageTransferBody transfer(
- framing::ProtocolVersion(), CATCH_UP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
- sb.get()->send(transfer, message.payload->getFrames());
-}
void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& binding) {
session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
@@ -190,11 +216,11 @@
// Re-create session state on remote connection.
- // For reasons unknown, boost::bind does not work here with boost 1.33.
+ // Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
+ ss->eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1));
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
-
// Adjust for message in progress, will be sent after state update.
SequenceNumber received = ss->receiverGetReceived().command;
if (inProgress)
@@ -221,7 +247,7 @@
QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId());
}
-void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
+void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) {
QPID_LOG(debug, dumperId << " dumping consumer " << ci->getName() << " on " << shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
@@ -246,5 +272,27 @@
client::SessionBase_0_10Access(shadowSession).get()->send(state);
QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
}
+
+void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
+ assert(dr.isEnded() || dr.getMessage().payload);
+
+ if (!dr.isEnded() && dr.isAcquired()) {
+ // If the message is acquired then it is no longer on the
+ // dumpees queue, put it on the dump queue for dumpee to pick up.
+ //
+ MessageDumper(DUMP, shadowSession).dump(dr.getMessage());
+ }
+ ClusterConnectionProxy(shadowSession).deliveryRecord(
+ dr.getQueue()->getName(),
+ dr.getMessage().position,
+ dr.getTag(),
+ dr.getId(),
+ dr.isAcquired(),
+ dr.isAccepted(),
+ dr.isCancelled(),
+ dr.isComplete(),
+ dr.isEnded(),
+ dr.isWindowing());
+}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h Thu Oct 30 14:07:28 2008
@@ -43,6 +43,7 @@
class QueueBinding;
class QueuedMessage;
class SessionHandler;
+class DeliveryRecord;
} // namespace broker
@@ -57,6 +58,8 @@
*/
class DumpClient : public sys::Runnable {
public:
+ static const std::string DUMP; // Name for special dump queue and exchange.
+
DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url&,
broker::Broker& donor, const ClusterMap& map, const std::vector<boost::intrusive_ptr<Connection> >& ,
const boost::function<void()>& done,
@@ -70,10 +73,12 @@
void dumpQueue(const boost::shared_ptr<broker::Queue>&);
void dumpExchange(const boost::shared_ptr<broker::Exchange>&);
void dumpMessage(const broker::QueuedMessage&);
+ void dumpMessageTo(const broker::QueuedMessage&, const std::string& queue, client::Session s);
void dumpBinding(const std::string& queue, const broker::QueueBinding& binding);
void dumpConnection(const boost::intrusive_ptr<Connection>& connection);
void dumpSession(broker::SessionHandler& s);
- void dumpConsumer(broker::SemanticState::ConsumerImpl*);
+ void dumpConsumer(const broker::SemanticState::ConsumerImpl*);
+ void dumpUnacked(const broker::DeliveryRecord&);
private:
MemberId dumperId;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Oct 30 14:07:28 2008
@@ -220,31 +220,90 @@
uint16_t channel;
};
-// FIXME aconway 2008-10-20: dump Tx state.
+QPID_AUTO_TEST_CASE(testUnacked) {
+ // Verify replication of unacknowledged messages.
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+
+ Message m;
+
+ SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
+ c0.session.queueDeclare("q1");
+ c0.session.messageTransfer(arg::content=Message("11","q1"));
+ LocalQueue q1;
+ c0.subs.subscribe(q1, "q1", manualAccept);
+ BOOST_CHECK_EQUAL(q1.get(TIME_SEC).getData(), "11"); // Acquired but not accepted
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue
+
+ SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
+ c0.session.queueDeclare("q2");
+ c0.session.messageTransfer(arg::content=Message("21","q2"));
+ c0.session.messageTransfer(arg::content=Message("22","q2"));
+
+ LocalQueue q2;
+ c0.subs.subscribe(q2, "q2", manualAcquire);
+ m = q2.get(TIME_SEC); // Not acquired or accepted, still on queue
+ BOOST_CHECK_EQUAL(m.getData(), "21");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed
+ c0.subs.getSubscription("q2").acquire(m); // Acquire manually
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed
+ BOOST_CHECK_EQUAL(q2.get(TIME_SEC).getData(), "22"); // Not acquired or accepted, still on queue
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired.
+
+ // Add new member while there are unacked messages.
+ cluster.add();
+ cluster.waitFor(2);
+ Client c1(cluster[1], "c1");
+
+ // Check queue counts
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u);
+
+ // Unacked messages should be requeued when session is closed.
+ c0.session.close();
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u);
+
+ BOOST_CHECK(c1.subs.get(m, "q1", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "11");
+ BOOST_CHECK(c1.subs.get(m, "q2", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "21");
+ BOOST_CHECK(c1.subs.get(m, "q2", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "22");
+}
+
QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testDumpTxState, 1) {
// Verify that we dump transaction state correctly to new members.
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
+
+ // Do work in a transaction.
c0.session.txSelect();
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=Message("1","q"));
- c0.session.txCommit();
-
- c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(1));
+ c0.session.messageTransfer(arg::content=Message("2","q"));
Message m;
- BOOST_CHECK(c0.lq.get(m, TIME_SEC));
+ BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "1");
- c0.session.messageTransfer(arg::content=Message("2","q"));
+ // New member, TX not comitted, c1 should see nothing.
cluster.add();
Client c1(cluster[1], "c1");
- // Not yet comitted, c1 should see nothing.
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+
+ // After commit c1 shoudl see results of tx.
c0.session.txCommit();
- // c1 shoudl see results of tx.
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "2");
+
+ // Another transaction with both members active.
+ c0.session.messageTransfer(arg::content=Message("3","q"));
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+ c0.session.txCommit();
+ BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "3");
}
QPID_AUTO_TEST_CASE(testDumpMessageBuilder) {
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Thu Oct 30 14:07:28 2008
@@ -92,7 +92,7 @@
ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT));
fix.session.queueDeclare(arg::queue="q");
fix.subs.subscribe(fix.lq, "q");
- Catcher<ConnectionException> pop(bind(&LocalQueue::pop, boost::ref(fix.lq)));
+ Catcher<ConnectionException> pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC));
fix.connection.proxy.close();
BOOST_CHECK(pop.join());
}
Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Thu Oct 30 14:07:28 2008
@@ -70,8 +70,7 @@
A connection is dumped as followed:
- open as a normal connection.
- attach sessions, create consumers, set flow with normal AMQP cokmmands.
- - reset session state by sending session-state for each session.
- - frames following session-state are replay frames.
+ - send /reset additional session state with controls below.
- send shadow-ready to mark end of shadow dump.
- send dump-complete when entire dump is complete.
-->
@@ -83,8 +82,22 @@
<field name="notifyEnabled" type="bit"/>
</control>
+ <!-- Delivery-record for outgoing messages sent but not yet accepted. -->
+ <control name="delivery-record" code ="0x11">
+ <field name="queue" type="str8"/>
+ <field name="position" type="sequence-no"/>
+ <field name="tag" type="str8"/>
+ <field name="id" type="sequence-no"/>
+ <field name="acquired" type="bit"/> <!--If not set, message follows. -->
+ <field name="accepted" type="bit"/>
+ <field name="cancelled" type="bit"/>
+ <field name="completed" type="bit"/>
+ <field name="ended" type="bit"/>
+ <field name="windowing" type="bit"/>
+ </control>
+
<!-- Complete a session state dump. -->
- <control name="session-state" code="0x11" label="Set session state during a brain dump.">
+ <control name="session-state" code="0x1F" label="Set session state during a brain dump.">
<!-- Target session deduced from channel number. -->
<field name="replay-start" type="sequence-no"/> <!-- Replay frames will start from this point.-->
<field name="command-point" type="sequence-no"/> <!-- Id of next command sent -->
@@ -97,15 +110,22 @@
</control>
<!-- Complete a shadow connection dump. -->
- <control name="shadow-ready" code="0x12" label="End of shadow connection dump.">
+ <control name="shadow-ready" code="0x20" label="End of shadow connection dump.">
<field name="member-id" type="uint64"/>
<field name="connection-id" type="uint64"/>
</control>
<!-- Complete a cluster state dump. -->
- <control name="membership" code="0x13" label="Cluster membership details.">
+ <control name="membership" code="0x21" label="Cluster membership details.">
<field name="newbies" type="map"/> <!-- member-id -> URL -->
<field name="members" type="map"/> <!-- member-id -> state -->
</control>
+
+ <!-- Set the position of a replicated queue. -->
+ <control name="queue-position" code="0x30">
+ <field name="queue" type="str8"/>
+ <field name="position" type="sequence-no"/>
+ </control>
+
</class>
</amqp>