You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/30 22:07:29 UTC

svn commit: r709242 - in /incubator/qpid/trunk/qpid/cpp: src/qpid/broker/ src/qpid/client/ src/qpid/cluster/ src/tests/ xml/

Author: aconway
Date: Thu Oct 30 14:07:28 2008
New Revision: 709242

URL: http://svn.apache.org/viewvc?rev=709242&view=rev
Log:
Replicate session state for un-acknowledged messages to new cluster members.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Oct 30 14:07:28 2008
@@ -380,15 +380,12 @@
 };
 }// namespace
 
-bool Queue::find(QueuedMessage& msg, SequenceNumber pos) const {
+QueuedMessage Queue::find(SequenceNumber pos) const {
     Mutex::ScopedLock locker(messageLock);
     Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos));
-    if (i == messages.end())
-        return false;
-    else {
-        msg = *i;
-        return true;
-    }
+    if (i != messages.end())
+        return *i;
+    return QueuedMessage();
 }
 
 void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
@@ -876,9 +873,6 @@
 }
 
 void Queue::setPosition(SequenceNumber n) {
-    if (n <= sequence)
-        throw InvalidArgumentException(QPID_MSG("Invalid position " << n << " < " << sequence
-                                                      << " for queue " << name));
+    Mutex::ScopedLock locker(messageLock);
     sequence = n;
-    --sequence;                 // Decrement so ++sequence will return n.
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Oct 30 14:07:28 2008
@@ -230,12 +230,8 @@
              */
             QueuedMessage get();
 
-            /** Get the message at position pos
-             *@param msg out parameter, assigned to the message found.
-             *@param pos position to search for.
-             *@return True if there is a message at pos, false otherwise.
-             */
-            bool find(QueuedMessage& msg, framing::SequenceNumber pos) const;
+            /** Get the message at position pos */
+            QueuedMessage find(framing::SequenceNumber pos) const;
 
             const QueuePolicy* getPolicy();
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Thu Oct 30 14:07:28 2008
@@ -100,10 +100,12 @@
 
     void readyToSend();
 
+    // Used by cluster to create replica sessions.
     template <class F> void eachConsumer(F f) { semanticState.eachConsumer(f); }
+    template <class F> void eachUnacked(F f) { semanticState.eachUnacked(f); }
     SemanticState::ConsumerImpl& getConsumer(const string& dest) { return semanticState.find(dest); } 
-
     boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
+    void record(const DeliveryRecord& delivery) { semanticState.record(delivery); }
 
   private:
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp Thu Oct 30 14:07:28 2008
@@ -33,12 +33,12 @@
 LocalQueue::LocalQueue() {}
 LocalQueue::~LocalQueue() {}
 
-Message LocalQueue::pop() { return get(); }
+Message LocalQueue::pop(sys::Duration timeout) { return get(timeout); }
 
-Message LocalQueue::get() {
+Message LocalQueue::get(sys::Duration timeout) {
     Message result;
-    bool ok = get(result, sys::TIME_INFINITE);
-    assert(ok); (void) ok;
+    bool ok = get(result, timeout);
+    if (!ok) throw Exception("Timed out waiting for a message");
     return result;
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h Thu Oct 30 14:07:28 2008
@@ -56,14 +56,14 @@
      */
     bool get(Message& result, sys::Duration timeout=0);
 
-    /** Get the next message off the local queue, or wait for a
-     * message from the broker queue.
-     *@exception ClosedException if subscription has been closed.
+    /** Get the next message off the local queue, or wait up to the timeout
+     * for message from the broker queue.
+     *@exception ClosedException if subscription is closed or timeout exceeded.
      */
-    Message get();
+    Message get(sys::Duration timeout=sys::TIME_INFINITE);
 
     /** Synonym for get(). */
-    Message pop();
+    Message pop(sys::Duration timeout=sys::TIME_INFINITE);
 
     /** Return true if local queue is empty. */
     bool empty() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Oct 30 14:07:28 2008
@@ -19,6 +19,7 @@
  *
  */
 #include "Connection.h"
+#include "DumpClient.h"
 #include "Cluster.h"
 
 #include "qpid/broker/SessionState.h"
@@ -73,8 +74,6 @@
     output.deliverDoOutput(requested);
 }
 
-// FIXME aconway 2008-10-15:  changes here, dubious.
-
 // Received from a directly connected client.
 void Connection::received(framing::AMQFrame& f) {
     QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
@@ -214,6 +213,50 @@
     return self.first == cluster.getId() && self.second == 0;
 }
 
+void Connection::deliveryRecord(const string& qname,
+                                const SequenceNumber& position,
+                                const string& tag,
+                                const SequenceNumber& id,
+                                bool acquired,
+                                bool accepted,
+                                bool cancelled,
+                                bool completed,
+                                bool ended,
+                                bool windowing)
+{
+    broker::QueuedMessage m;
+    broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname);
+    if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname));
+    broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
+    if (!dumpQueue) throw Exception(QPID_MSG(cluster << " deliveryRecord missing dump queue"));
+
+    if (!ended) {               // Has a message
+        if (acquired)           // Message at front of dump queue
+            m = dumpQueue->get();
+        else                    // Message at original position in original queue
+            m = queue->find(position);
+        if (!m.payload)
+            throw Exception(QPID_MSG("deliveryRecord no dump message"));
+    }
+
+    broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing);
+    dr.setId(id);
+    if (cancelled) dr.cancel(dr.getTag());
+    if (completed) dr.complete();
+    if (ended) dr.setEnded();   // Exsitance of message
+
+    broker::SessionHandler& h = connection.getChannel(currentChannel);
+    broker::SessionState* s = h.getSession();
+    assert(s);
+    s->record(dr);
+}
+
+void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
+    shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
+    if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
+    q->setPosition(position);
+}
+
 std::ostream& operator<<(std::ostream& o, const Connection& c) {
     const char* type="unknown";
     if (c.isLocal()) type = "local";
@@ -222,5 +265,7 @@
     return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
 }
 
+    
+
 }} // namespace qpid::cluster
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Oct 30 14:07:28 2008
@@ -94,17 +94,30 @@
     // ==== Used in catch-up mode to build initial state.
     // 
     // State dump methods.
-    void sessionState(const SequenceNumber& replayStart,
-                      const SequenceNumber& sendCommandPoint,
-                      const SequenceSet& sentIncomplete,
-                      const SequenceNumber& expected,
-                      const SequenceNumber& received,
-                      const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
+    void sessionState(const framing::SequenceNumber& replayStart,
+                      const framing::SequenceNumber& sendCommandPoint,
+                      const framing::SequenceSet& sentIncomplete,
+                      const framing::SequenceNumber& expected,
+                      const framing::SequenceNumber& received,
+                      const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
     
     void shadowReady(uint64_t memberId, uint64_t connectionId);
 
     void membership(const framing::FieldTable&, const framing::FieldTable&);
 
+    void deliveryRecord(const std::string& queue,
+                        const framing::SequenceNumber& position,
+                        const std::string& tag,
+                        const framing::SequenceNumber& id,
+                        bool acquired,
+                        bool accepted,
+                        bool cancelled,
+                        bool completed,
+                        bool ended,
+                        bool windowing);
+
+    void queuePosition(const std::string&, const framing::SequenceNumber&);
+    
   private:
     bool catcUp;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Thu Oct 30 14:07:28 2008
@@ -56,14 +56,11 @@
 using client::SessionBase_0_10Access;
 
 struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection {
-    ClusterConnectionProxy(client::Connection& c) :
+    ClusterConnectionProxy(client::Connection c) :
         AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {}
+    ClusterConnectionProxy(client::AsyncSession s) :
+        AMQP_AllProxy::ClusterConnection(SessionBase_0_10Access(s).get()->out) {}
 };
-struct ClusterProxy : public AMQP_AllProxy::Cluster {
-    ClusterProxy(client::Connection& c) :
-        AMQP_AllProxy::Cluster(*client::ConnectionAccess::getImpl(c)) {}
-};
-
 
 // Create a connection with special version that marks it as a catch-up connection.
 client::Connection catchUpConnection() {
@@ -73,7 +70,7 @@
 }
 
 // Send a control body directly to the session.
-void send(client::Session& s, const AMQBody& body) {
+void send(client::AsyncSession& s, const AMQBody& body) {
     client::SessionBase_0_10Access sb(s);
     sb.get()->send(body);
 }
@@ -94,19 +91,23 @@
 
 DumpClient::~DumpClient() {}
 
-// Catch-up exchange name: an illegal AMQP exchange name to avoid clashes.
-static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
-static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); 
+// Illegal exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
+static const char DUMP_CHARS[] = "\000qpid-dump";
+const std::string DumpClient::DUMP(DUMP_CHARS, sizeof(DUMP_CHARS)); 
 
 void DumpClient::dump() {
     QPID_LOG(debug, dumperId << " dumping state to " << dumpeeId << " at " << dumpeeUrl);
     Broker& b = dumperBroker;
     b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
-    // Catch-up exchange is used to route messages to the proper queue without modifying routing key.
-    session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true);
+
+    // Dump exchange is used to route messages to the proper queue without modifying routing key.
+    session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true);
     b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
+// Dump queue is used to transfer acquired messages that are no longer on their original queue.
+    session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true);
     session.sync();
     session.close();
+
     std::for_each(connections.begin(), connections.end(), boost::bind(&DumpClient::dumpConnection, this, _1));
     AMQFrame frame(map.asMethodBody());
     client::ConnectionAccess::getImpl(connection)->handle(frame);
@@ -134,6 +135,39 @@
         arg::arguments=ex->getArgs());
 }
 
+/** Bind a queue to the dump exchange and dump messges to it
+ * setting the message possition as needed.
+ */
+class MessageDumper {
+    std::string queue;
+    bool haveLastPos;
+    framing::SequenceNumber lastPos;
+    client::AsyncSession session;
+
+  public:
+
+    MessageDumper(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) {
+        session.exchangeBind(queue, DumpClient::DUMP);
+    }
+
+    ~MessageDumper() {
+        session.exchangeUnbind(queue, DumpClient::DUMP);
+    }
+
+    void dump(const broker::QueuedMessage& message) {
+        if (!haveLastPos || message.position - lastPos != 1)  {
+            ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
+            haveLastPos = true;
+        }
+        lastPos = message.position;
+        SessionBase_0_10Access sb(session);
+        framing::MessageTransferBody transfer(
+            framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
+        sb.get()->send(transfer, message.payload->getFrames());
+    }
+};
+
+
 void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
     session.queueDeclare(
         q->getName(),
@@ -143,19 +177,11 @@
         arg::exclusive=q->hasExclusiveConsumer(),
         arg::autoDelete=q->isAutoDelete(),
         arg::arguments=q->getSettings());
-
-    session.exchangeBind(q->getName(), CATCH_UP, std::string());
-    q->eachMessage(boost::bind(&DumpClient::dumpMessage, this, _1));
-    session.exchangeUnbind(q->getName(), CATCH_UP, std::string());
+    MessageDumper dumper(q->getName(), session);
+    q->eachMessage(boost::bind(&MessageDumper::dump, &dumper, _1));
     q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1));
 }
 
-void DumpClient::dumpMessage(const broker::QueuedMessage& message) {
-    SessionBase_0_10Access sb(session);
-    framing::MessageTransferBody transfer(
-        framing::ProtocolVersion(), CATCH_UP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
-    sb.get()->send(transfer, message.payload->getFrames());
-}
 
 void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& binding) {
     session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
@@ -190,11 +216,11 @@
 
     // Re-create session state on remote connection.
 
-    // For reasons unknown, boost::bind does not work here with boost 1.33.
+    // Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
     ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
+    ss->eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1));
 
     boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
-
     //  Adjust for message in progress, will be sent after state update.
     SequenceNumber received = ss->receiverGetReceived().command;
     if (inProgress)  
@@ -221,7 +247,7 @@
     QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId());
 }
 
-void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
+void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) {
     QPID_LOG(debug, dumperId << " dumping consumer " << ci->getName() << " on " << shadowSession.getId());
     using namespace message;
     shadowSession.messageSubscribe(
@@ -246,5 +272,27 @@
     client::SessionBase_0_10Access(shadowSession).get()->send(state);
     QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
 }
+    
+void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
+    assert(dr.isEnded() || dr.getMessage().payload);
+
+    if (!dr.isEnded() && dr.isAcquired()) {
+        // If the message is acquired then it is no longer on the
+        // dumpees queue, put it on the dump queue for dumpee to pick up.
+        //
+        MessageDumper(DUMP, shadowSession).dump(dr.getMessage());
+    }
+    ClusterConnectionProxy(shadowSession).deliveryRecord(
+        dr.getQueue()->getName(),
+        dr.getMessage().position,
+        dr.getTag(),
+        dr.getId(),
+        dr.isAcquired(),
+        dr.isAccepted(),
+        dr.isCancelled(),
+        dr.isComplete(),
+        dr.isEnded(),
+        dr.isWindowing());
+}
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h Thu Oct 30 14:07:28 2008
@@ -43,6 +43,7 @@
 class QueueBinding;
 class QueuedMessage;
 class SessionHandler;
+class DeliveryRecord;
 
 } // namespace broker
 
@@ -57,6 +58,8 @@
  */
 class DumpClient : public sys::Runnable {
   public:
+    static const std::string DUMP; // Name for special dump queue and exchange.
+    
     DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url&,
                broker::Broker& donor, const ClusterMap& map, const std::vector<boost::intrusive_ptr<Connection> >& ,
                const boost::function<void()>& done,
@@ -70,10 +73,12 @@
     void dumpQueue(const boost::shared_ptr<broker::Queue>&);
     void dumpExchange(const boost::shared_ptr<broker::Exchange>&);
     void dumpMessage(const broker::QueuedMessage&);
+    void dumpMessageTo(const broker::QueuedMessage&, const std::string& queue, client::Session s);
     void dumpBinding(const std::string& queue, const broker::QueueBinding& binding);
     void dumpConnection(const boost::intrusive_ptr<Connection>& connection);
     void dumpSession(broker::SessionHandler& s);
-    void dumpConsumer(broker::SemanticState::ConsumerImpl*);
+    void dumpConsumer(const broker::SemanticState::ConsumerImpl*);
+    void dumpUnacked(const broker::DeliveryRecord&);
     
   private:
     MemberId dumperId;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Oct 30 14:07:28 2008
@@ -220,31 +220,90 @@
     uint16_t channel;
 };
 
-// FIXME aconway 2008-10-20: dump Tx state.
+QPID_AUTO_TEST_CASE(testUnacked) {
+    // Verify replication of unacknowledged messages.
+    ClusterFixture cluster(1);
+    Client c0(cluster[0], "c0"); 
+
+    Message m;
+
+    SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
+    c0.session.queueDeclare("q1");
+    c0.session.messageTransfer(arg::content=Message("11","q1"));
+    LocalQueue q1;
+    c0.subs.subscribe(q1, "q1", manualAccept);
+    BOOST_CHECK_EQUAL(q1.get(TIME_SEC).getData(), "11"); // Acquired but not accepted
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue
+
+    SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
+    c0.session.queueDeclare("q2");
+    c0.session.messageTransfer(arg::content=Message("21","q2"));
+    c0.session.messageTransfer(arg::content=Message("22","q2"));
+
+    LocalQueue q2;
+    c0.subs.subscribe(q2, "q2", manualAcquire);
+    m = q2.get(TIME_SEC);  // Not acquired or accepted, still on queue
+    BOOST_CHECK_EQUAL(m.getData(), "21");
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed
+    c0.subs.getSubscription("q2").acquire(m); // Acquire manually
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed
+    BOOST_CHECK_EQUAL(q2.get(TIME_SEC).getData(), "22"); // Not acquired or accepted, still on queue
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired.
+
+    // Add new member while there are unacked messages.
+    cluster.add();
+    cluster.waitFor(2);
+    Client c1(cluster[1], "c1"); 
+
+    // Check queue counts
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u);
+
+    // Unacked messages should be requeued when session is closed.
+    c0.session.close();
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u);
+
+    BOOST_CHECK(c1.subs.get(m, "q1", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "11");
+    BOOST_CHECK(c1.subs.get(m, "q2", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "21");
+    BOOST_CHECK(c1.subs.get(m, "q2", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "22");
+}
+
 QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testDumpTxState, 1) {
     // Verify that we dump transaction state correctly to new members.
     ClusterFixture cluster(1);
     Client c0(cluster[0], "c0");
+
+    // Do work in a transaction.
     c0.session.txSelect();
     c0.session.queueDeclare("q");
     c0.session.messageTransfer(arg::content=Message("1","q"));
-    c0.session.txCommit();
-    
-    c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(1));
+    c0.session.messageTransfer(arg::content=Message("2","q"));
     Message m;
-    BOOST_CHECK(c0.lq.get(m, TIME_SEC));
+    BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
     BOOST_CHECK_EQUAL(m.getData(), "1");
-    c0.session.messageTransfer(arg::content=Message("2","q"));
 
+    // New member, TX not comitted, c1 should see nothing.
     cluster.add();
     Client c1(cluster[1], "c1");
-    // Not yet comitted, c1 should see nothing.
     BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+
+    // After commit c1 shoudl see results of tx.
     c0.session.txCommit();
-    // c1 shoudl see results of tx.
     BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
     BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
     BOOST_CHECK_EQUAL(m.getData(), "2");
+
+    // Another transaction with both members active.
+    c0.session.messageTransfer(arg::content=Message("3","q"));
+    BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+    c0.session.txCommit();
+    BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
+    BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "3");
 }
 
 QPID_AUTO_TEST_CASE(testDumpMessageBuilder) {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Thu Oct 30 14:07:28 2008
@@ -92,7 +92,7 @@
     ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT));
     fix.session.queueDeclare(arg::queue="q");
     fix.subs.subscribe(fix.lq, "q");
-    Catcher<ConnectionException> pop(bind(&LocalQueue::pop, boost::ref(fix.lq)));
+    Catcher<ConnectionException> pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC));
     fix.connection.proxy.close();
     BOOST_CHECK(pop.join());
 }

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=709242&r1=709241&r2=709242&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Thu Oct 30 14:07:28 2008
@@ -70,8 +70,7 @@
 	 A connection is dumped as followed:
 	 - open as a normal connection.
 	 - attach sessions, create consumers, set flow with normal AMQP cokmmands.
-	 - reset session state by sending session-state for each session.
-	  - frames following session-state are replay frames.
+	 - send /reset additional session state with controls below.
 	 - send shadow-ready to mark end of shadow dump.
 	 - send dump-complete when entire dump is complete.
     -->
@@ -83,8 +82,22 @@
       <field name="notifyEnabled" type="bit"/>
     </control>
 
+    <!-- Delivery-record for outgoing messages sent but not yet accepted. -->
+    <control name="delivery-record" code ="0x11">
+      <field name="queue" type="str8"/>
+      <field name="position" type="sequence-no"/>
+      <field name="tag" type="str8"/>
+      <field name="id" type="sequence-no"/>
+      <field name="acquired" type="bit"/>		       <!--If not set, message follows. -->
+      <field name="accepted" type="bit"/>
+      <field name="cancelled" type="bit"/>
+      <field name="completed" type="bit"/>
+      <field name="ended" type="bit"/>
+      <field name="windowing" type="bit"/>
+    </control>
+    
     <!-- Complete a session state dump. -->
-    <control name="session-state" code="0x11" label="Set session state during a brain dump.">
+    <control name="session-state" code="0x1F" label="Set session state during a brain dump.">
       <!-- Target session deduced from channel number.  -->
       <field name="replay-start" type="sequence-no"/>	       <!-- Replay frames will start from this point.-->
       <field name="command-point" type="sequence-no"/>	       <!-- Id of next command sent -->
@@ -97,15 +110,22 @@
     </control>
 
     <!-- Complete a shadow connection dump. -->
-    <control name="shadow-ready" code="0x12" label="End of shadow connection dump.">
+    <control name="shadow-ready" code="0x20" label="End of shadow connection dump.">
       <field name="member-id" type="uint64"/>
       <field name="connection-id" type="uint64"/>
     </control>
 
     <!-- Complete a cluster state dump. -->
-    <control name="membership" code="0x13" label="Cluster membership details.">
+    <control name="membership" code="0x21" label="Cluster membership details.">
       <field name="newbies" type="map"/> <!-- member-id -> URL -->
       <field name="members" type="map"/> <!-- member-id -> state -->
     </control>
+
+    <!-- Set the position of a replicated queue. -->
+    <control name="queue-position" code="0x30">
+      <field name="queue" type="str8"/>
+      <field name="position" type="sequence-no"/>
+    </control>
+    
   </class>
 </amqp>