You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2013/01/25 19:20:49 UTC

svn commit: r1438629 [3/10] - in /qpid/branches/java-broker-config-qpid-4390: ./ qpid/ qpid/bin/ qpid/cpp/ qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf2/ qpid/cpp/bindings/qpid/ qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/t/ qpid/cpp/bindi...

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxManager.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxManager.cpp Fri Jan 25 18:20:39 2013
@@ -27,6 +27,9 @@
 #include "qpid/ptr_map.h"
 
 #include <boost/format.hpp>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+
 #include <iostream>
 
 using boost::intrusive_ptr;
@@ -35,6 +38,30 @@ using qpid::ptr_map_ptr;
 using namespace qpid::broker;
 using namespace qpid::framing;
 
+namespace {
+    typedef boost::function0<void> FireFunction;
+    struct DtxCleanup : public qpid::sys::TimerTask
+    {
+        FireFunction fireFunction;
+
+        DtxCleanup(uint32_t timeout, FireFunction f);
+        void fire();
+    };
+
+    DtxCleanup::DtxCleanup(uint32_t _timeout, FireFunction f)
+    : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), fireFunction(f){}
+
+    void DtxCleanup::fire()
+    {
+        try {
+            fireFunction();
+        } catch (qpid::ConnectionException& /*e*/) {
+            //assume it was explicitly cleaned up after a call to prepare, commit or rollback
+        }
+    }
+
+}
+
 DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
 
 DtxManager::~DtxManager() {}
@@ -156,19 +183,7 @@ void DtxManager::timedout(const std::str
     } else {
         ptr_map_ptr(i)->timedout();
         //TODO: do we want to have a timed task to cleanup, or can we rely on an explicit completion?
-        //timer.add(intrusive_ptr<TimerTask>(new DtxCleanup(60*30/*30 mins*/, *this, xid)));
-    }
-}
-
-DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)
-    : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), mgr(_mgr), xid(_xid) {}
-
-void DtxManager::DtxCleanup::fire()
-{
-    try {
-        mgr.remove(xid);
-    } catch (ConnectionException& /*e*/) {
-        //assume it was explicitly cleaned up after a call to prepare, commit or rollback
+        //timer->add(new DtxCleanup(60*30/*30 mins*/, boost::bind(&DtxManager::remove, this, xid)));
     }
 }
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxManager.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxManager.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxManager.h Fri Jan 25 18:20:39 2013
@@ -31,20 +31,15 @@
 #include "qpid/ptr_map.h"
 
 namespace qpid {
+namespace sys {
+class Timer;
+}
+
 namespace broker {
 
 class DtxManager{
     typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap;
 
-    struct DtxCleanup : public sys::TimerTask
-    {
-        DtxManager& mgr;
-        const std::string& xid;
-
-        DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);
-        void fire();
-    };
-
     WorkMap work;
     TransactionalStore* store;
     qpid::sys::Mutex lock;
@@ -68,11 +63,6 @@ public:
     void setStore(TransactionalStore* store);
     void setTimer(sys::Timer& t) { timer = &t; }
 
-    // Used by cluster for replication.
-    template<class F> void each(F f) const {
-        for (WorkMap::const_iterator i = work.begin(); i != work.end(); ++i)
-            f(*ptr_map_ptr(i));
-    }
     DtxWorkRecord* getWork(const std::string& xid);
     bool exists(const std::string& xid);
     static std::string convert(const framing::Xid& xid);

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp Fri Jan 25 18:20:39 2013
@@ -20,7 +20,10 @@
  */
 #include "qpid/broker/DtxWorkRecord.h"
 #include "qpid/broker/DtxManager.h"
+#include "qpid/broker/DtxTimeout.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Timer.h"
+
 #include <boost/format.hpp>
 #include <boost/mem_fn.hpp>
 using boost::mem_fn;
@@ -39,6 +42,12 @@ DtxWorkRecord::~DtxWorkRecord()
     }
 }
 
+void DtxWorkRecord::setTimeout(boost::intrusive_ptr<DtxTimeout> t)
+{ timeout = t; }
+
+boost::intrusive_ptr<DtxTimeout> DtxWorkRecord::getTimeout()
+{ return timeout; }
+
 bool DtxWorkRecord::prepare()
 {
     Mutex::ScopedLock locker(lock);
@@ -176,17 +185,3 @@ void DtxWorkRecord::timedout()
     }
     abort();
 }
-
-size_t DtxWorkRecord::indexOf(const DtxBuffer::shared_ptr& buf) {
-    Work::iterator i = std::find(work.begin(), work.end(), buf);
-    if (i == work.end()) throw NotFoundException(
-        QPID_MSG("Can't find DTX buffer for xid: " << buf->getXid()));
-    return i - work.begin();
-}
-
-DtxBuffer::shared_ptr DtxWorkRecord::operator[](size_t i) const {
-    if (i > work.size())
-        throw NotFoundException(
-            QPID_MSG("Can't find DTX buffer " << i << " for xid: " << xid));
-    return work[i];
-}

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxWorkRecord.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/DtxWorkRecord.h Fri Jan 25 18:20:39 2013
@@ -23,7 +23,6 @@
 
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/DtxBuffer.h"
-#include "qpid/broker/DtxTimeout.h"
 #include "qpid/broker/TransactionalStore.h"
 
 #include "qpid/framing/amqp_types.h"
@@ -38,6 +37,8 @@
 namespace qpid {
 namespace broker {
 
+struct DtxTimeout;
+
 /**
  * Represents the work done under a particular distributed transaction
  * across potentially multiple channels. Identified by a xid. Allows
@@ -71,19 +72,13 @@ public:
     QPID_BROKER_EXTERN void add(DtxBuffer::shared_ptr ops);
     void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops);
     void timedout();
-    void setTimeout(boost::intrusive_ptr<DtxTimeout> t) { timeout = t; }
-    boost::intrusive_ptr<DtxTimeout> getTimeout() { return timeout; }
+    void setTimeout(boost::intrusive_ptr<DtxTimeout> t);
+    boost::intrusive_ptr<DtxTimeout> getTimeout();
     std::string getXid() const { return xid; }
     bool isCompleted() const { return completed; }
     bool isRolledback() const { return rolledback; }
     bool isPrepared() const { return prepared; }
     bool isExpired() const { return expired; }
-
-    // Used by cluster update;
-    size_t size() const { return work.size(); }
-    DtxBuffer::shared_ptr operator[](size_t i) const;
-    uint32_t getTimeout() const { return timeout? timeout->timeout : 0; }
-    size_t indexOf(const DtxBuffer::shared_ptr&);
 };
 
 }} // qpid::broker

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp Fri Jan 25 18:20:39 2013
@@ -210,8 +210,6 @@ Exchange::Exchange(const string& _name, 
 
     ive = _args.get(qpidIVE);
     if (ive) {
-        if (broker && broker->isInCluster())
-            throw framing::NotImplementedException("Cannot use Initial Value Exchanges in a cluster");
         QPID_LOG(debug, "Configured exchange " <<  _name  << " with Initial Value");
     }
 }
@@ -225,6 +223,7 @@ Exchange::~Exchange ()
 void Exchange::setAlternate(Exchange::shared_ptr _alternate)
 {
     alternate = _alternate;
+    alternate->incAlternateUsers();
     if (mgmtExchange != 0) {
         if (alternate.get() != 0)
             mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId());

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Fri Jan 25 18:20:39 2013
@@ -79,10 +79,7 @@ pair<Exchange::shared_ptr, bool> Exchang
             }
             exchanges[name] = exchange;
             result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
-            if (alternate) {
-                exchange->setAlternate(alternate);
-                alternate->incAlternateUsers();
-            }
+            if (alternate) exchange->setAlternate(alternate);
             // Call exchangeCreate inside the lock to ensure correct ordering.
             if (broker) broker->getConfigurationObservers().exchangeCreate(exchange);
         } else {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp Fri Jan 25 18:20:39 2013
@@ -33,6 +33,7 @@
 #include "qpid/framing/amqp_types.h"
 #include "qpid/broker/AclModule.h"
 #include "qpid/broker/Exchange.h"
+#include "qpid/broker/NameGenerator.h"
 #include "qpid/UrlArray.h"
 
 namespace qpid {
@@ -147,7 +148,6 @@ Link::Link(const string&  _name,
       persistenceId(0), broker(_broker), state(0),
       visitCount(0),
       currentInterval(1),
-      closing(false),
       reconnectNext(0),         // Index of next address for reconnecting in url.
       nextFreeChannel(1),
       freeChannels(1, framing::CHANNEL_MAX),
@@ -170,12 +170,8 @@ Link::Link(const string&  _name,
             agent->addObject(mgmtObject, 0, durable);
         }
     }
-    if (links->isPassive()) {
-        setStateLH(STATE_PASSIVE);
-    } else {
-        setStateLH(STATE_WAITING);
-        startConnectionLH();
-    }
+    setStateLH(STATE_WAITING);
+    startConnectionLH();
     broker->getTimer().add(timerTask);
 
     if (failover) {
@@ -209,9 +205,6 @@ void Link::setStateLH (int newState)
 
     state = newState;
 
-    if (hideManagement())
-        return;
-
     switch (state)
     {
     case STATE_WAITING     : mgmtObject->set_state("Waiting");     break;
@@ -219,7 +212,7 @@ void Link::setStateLH (int newState)
     case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
     case STATE_FAILED      : mgmtObject->set_state("Failed");      break;
     case STATE_CLOSED      : mgmtObject->set_state("Closed");      break;
-    case STATE_PASSIVE     : mgmtObject->set_state("Passive");      break;
+    case STATE_CLOSING     : mgmtObject->set_state("Closing");     break;
     }
 }
 
@@ -230,40 +223,39 @@ void Link::startConnectionLH ()
         // Set the state before calling connect.  It is possible that connect
         // will fail synchronously and call Link::closed before returning.
         setStateLH(STATE_CONNECTING);
-        broker->connect (host, boost::lexical_cast<std::string>(port), transport,
+        broker->connect (name, host, boost::lexical_cast<std::string>(port), transport,
                          boost::bind (&Link::closed, this, _1, _2));
         QPID_LOG (info, "Inter-broker link connecting to " << host << ":" << port);
     } catch(const std::exception& e) {
         QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: "
                  << e.what());
         setStateLH(STATE_WAITING);
-        if (!hideManagement())
-            mgmtObject->set_lastError (e.what());
+        mgmtObject->set_lastError (e.what());
     }
 }
 
 void Link::established(Connection* c)
 {
-    if (state == STATE_PASSIVE) return;
     stringstream addr;
     addr << host << ":" << port;
     QPID_LOG (info, "Inter-broker link established to " << addr.str());
 
-    if (!hideManagement() && agent)
+    if (agent)
         agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
-    bool isClosing = false;
+    bool isClosing = true;
     {
         Mutex::ScopedLock mutex(lock);
-        setStateLH(STATE_OPERATIONAL);
-        currentInterval = 1;
-        visitCount      = 0;
-        connection = c;
-        isClosing = closing;
+        if (state != STATE_CLOSING) {
+            isClosing = false;
+            setStateLH(STATE_OPERATIONAL);
+            currentInterval = 1;
+            visitCount      = 0;
+            connection = c;
+            c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+        }
     }
     if (isClosing)
         destroy();
-    else // Process any IO tasks bridges added before established.
-        c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
 }
 
 
@@ -288,11 +280,12 @@ class DetachedCallback : public SessionH
 };
 }
 
-void Link::opened() {
+void Link::opened()
+{
     Mutex::ScopedLock mutex(lock);
-    if (!connection) return;
+    if (!connection || state != STATE_OPERATIONAL) return;
 
-    if (!hideManagement() && connection->GetManagementObject()) {
+    if (connection->GetManagementObject()) {
         mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
     }
 
@@ -347,37 +340,43 @@ void Link::opened() {
     }
 }
 
+
+// called when connection attempt fails (see startConnectionLH)
 void Link::closed(int, std::string text)
 {
-    Mutex::ScopedLock mutex(lock);
     QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
 
-    connection = 0;
+    bool isClosing = false;
+    {
+        Mutex::ScopedLock mutex(lock);
+
+        connection = 0;
 
-    if (!hideManagement()) {
         mgmtObject->set_connectionRef(qpid::management::ObjectId());
         if (state == STATE_OPERATIONAL && agent) {
             stringstream addr;
             addr << host << ":" << port;
             agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
         }
-    }
 
-    for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
-        (*i)->closed();
-        created.push_back(*i);
-    }
-    active.clear();
+        for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+            (*i)->closed();
+            created.push_back(*i);
+        }
+        active.clear();
 
-    if (state != STATE_FAILED && state != STATE_PASSIVE)
-    {
-        setStateLH(STATE_WAITING);
-        if (!hideManagement())
+        if (state == STATE_CLOSING) {
+            isClosing = true;
+        } else if (state != STATE_FAILED) {
+            setStateLH(STATE_WAITING);
             mgmtObject->set_lastError (text);
+        }
     }
+    if (isClosing) destroy();
 }
 
-// Called in connection IO thread, cleans up the connection before destroying Link
+// Cleans up the connection before destroying Link.  Must be called in connection thread
+// if the connection is active.  Caller Note well: may call "delete this"!
 void Link::destroy ()
 {
     Bridges toDelete;
@@ -407,7 +406,9 @@ void Link::destroy ()
     for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
         (*i)->close();
     toDelete.clear();
-    listener(this); // notify LinkRegistry that this Link has been destroyed
+    // notify LinkRegistry that this Link has been destroyed.  Will result in "delete
+    // this" if LinkRegistry is holding the last shared pointer to *this
+    listener(this);
 }
 
 void Link::add(Bridge::shared_ptr bridge)
@@ -449,7 +450,7 @@ void Link::ioThreadProcessing()
 {
     Mutex::ScopedLock mutex(lock);
 
-    if (state != STATE_OPERATIONAL || closing)
+    if (state != STATE_OPERATIONAL)
         return;
 
     // check for bridge session errors and recover
@@ -486,9 +487,9 @@ void Link::ioThreadProcessing()
 void Link::maintenanceVisit ()
 {
     Mutex::ScopedLock mutex(lock);
-    if (closing) return;
-    if (state == STATE_WAITING)
-    {
+
+    switch (state) {
+    case STATE_WAITING:
         visitCount++;
         if (visitCount >= currentInterval)
         {
@@ -501,11 +502,17 @@ void Link::maintenanceVisit ()
                 startConnectionLH();
             }
         }
+        break;
+
+    case STATE_OPERATIONAL:
+        if ((!active.empty() || !created.empty() || !cancellations.empty()) &&
+            connection && connection->isOpen())
+            connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+        break;
+
+    default:    // no-op for all other states
+        break;
     }
-    else if (state == STATE_OPERATIONAL &&
-             (!active.empty() || !created.empty() || !cancellations.empty()) &&
-             connection && connection->isOpen())
-        connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
 }
 
 void Link::reconnectLH(const Address& a)
@@ -514,14 +521,13 @@ void Link::reconnectLH(const Address& a)
     port = a.port;
     transport = a.protocol;
 
-    if (!hideManagement()) {
-        stringstream errorString;
-        errorString << "Failing over to " << a;
-        mgmtObject->set_lastError(errorString.str());
-        mgmtObject->set_host(host);
-        mgmtObject->set_port(port);
-        mgmtObject->set_transport(transport);
-    }
+    stringstream errorString;
+    errorString << "Failing over to " << a;
+    mgmtObject->set_lastError(errorString.str());
+    mgmtObject->set_host(host);
+    mgmtObject->set_port(port);
+    mgmtObject->set_transport(transport);
+
     startConnectionLH();
 }
 
@@ -538,12 +544,6 @@ bool Link::tryFailoverLH() {
     return false;
 }
 
-// Management updates for a link are inconsistent in a cluster, so they are
-// suppressed.
-bool Link::hideManagement() const {
-    return !mgmtObject || ( broker && broker->isInCluster());
-}
-
 // Allocate channel from link free pool
 framing::ChannelId Link::nextChannel()
 {
@@ -583,10 +583,17 @@ void Link::returnChannel(framing::Channe
 
 void Link::notifyConnectionForced(const string text)
 {
-    Mutex::ScopedLock mutex(lock);
-    setStateLH(STATE_FAILED);
-    if (!hideManagement())
-        mgmtObject->set_lastError(text);
+    bool isClosing = false;
+    {
+        Mutex::ScopedLock mutex(lock);
+        if (state == STATE_CLOSING) {
+            isClosing = true;
+        } else {
+            setStateLH(STATE_FAILED);
+            mgmtObject->set_lastError(text);
+        }
+    }
+    if (isClosing) destroy();
 }
 
 void Link::setPersistenceId(uint64_t id) const
@@ -676,14 +683,25 @@ ManagementObject::shared_ptr Link::GetMa
 
 void Link::close() {
     QPID_LOG(debug, "Link::close(), link=" << name );
-    Mutex::ScopedLock mutex(lock);
-    if (!closing) {
-        closing = true;
-        if (state != STATE_CONNECTING && connection) {
-            //connection can only be closed on the connections own IO processing thread
-            connection->requestIOProcessing(boost::bind(&Link::destroy, this));
+    bool destroy_now = false;
+    {
+        Mutex::ScopedLock mutex(lock);
+        if (state != STATE_CLOSING) {
+            int old_state = state;
+            setStateLH(STATE_CLOSING);
+            if (connection) {
+                //connection can only be closed on the connections own IO processing thread
+                connection->requestIOProcessing(boost::bind(&Link::destroy, this));
+            } else if (old_state == STATE_CONNECTING) {
+                // cannot destroy Link now since a connection request is outstanding.
+                // destroy the link after we get a response (see Link::established,
+                // Link::closed, Link::notifyConnectionForced, etc).
+            } else {
+                destroy_now = true;
+            }
         }
     }
+    if (destroy_now) destroy();
 }
 
 
@@ -727,22 +745,6 @@ Manageable::status_t Link::ManagementMet
     return Manageable::STATUS_UNKNOWN_METHOD;
 }
 
-void Link::setPassive(bool passive)
-{
-    Mutex::ScopedLock mutex(lock);
-    if (passive) {
-        setStateLH(STATE_PASSIVE);
-    } else {
-        if (state == STATE_PASSIVE) {
-            setStateLH(STATE_WAITING);
-        } else {
-            QPID_LOG(warning, "Ignoring attempt to activate non-passive link "
-                     << host << ":" << port);
-        }
-    }
-}
-
-
 /** utility to clean up connection resources correctly */
 void Link::closeConnection( const std::string& reason)
 {
@@ -778,28 +780,6 @@ namespace {
     const std::string FAILOVER_INDEX("failover-index");
 }
 
-void Link::getState(framing::FieldTable& state) const
-{
-    state.clear();
-    Mutex::ScopedLock mutex(lock);
-    if (!url.empty()) {
-        state.setString(FAILOVER_ADDRESSES, url.str());
-        state.setInt(FAILOVER_INDEX, reconnectNext);
-    }
-}
-
-void Link::setState(const framing::FieldTable& state)
-{
-    Mutex::ScopedLock mutex(lock);
-    if (state.isSet(FAILOVER_ADDRESSES)) {
-        Url failovers(state.getAsString(FAILOVER_ADDRESSES));
-        setUrl(failovers);
-    }
-    if (state.isSet(FAILOVER_INDEX)) {
-        reconnectNext = state.getAsInt(FAILOVER_INDEX);
-    }
-}
-
 std::string Link::createName(const std::string& transport,
                              const std::string& host,
                              uint16_t  port)
@@ -810,14 +790,6 @@ std::string Link::createName(const std::
     return linkName.str();
 }
 
-
-bool Link::pendingConnection(const std::string& _host, uint16_t _port) const
-{
-    Mutex::ScopedLock mutex(lock);
-    return (isConnecting() && _port == port && _host == host);
-}
-
-
 const std::string Link::exchangeTypeName("qpid.LinkExchange");
 
 }} // namespace qpid::broker

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h Fri Jan 25 18:20:39 2013
@@ -74,7 +74,6 @@ class Link : public PersistableConfig, p
     int     state;
     uint32_t visitCount;
     uint32_t currentInterval;
-    bool     closing;
     Url      url;       // URL can contain many addresses.
     size_t   reconnectNext; // Index for next re-connect attempt
 
@@ -98,7 +97,7 @@ class Link : public PersistableConfig, p
     static const int STATE_OPERATIONAL = 3;
     static const int STATE_FAILED      = 4;
     static const int STATE_CLOSED      = 5;
-    static const int STATE_PASSIVE     = 6;
+    static const int STATE_CLOSING     = 6;  // Waiting for outstanding connect to complete first
 
     static const uint32_t MAX_INTERVAL = 32;
 
@@ -107,7 +106,6 @@ class Link : public PersistableConfig, p
     void destroy();                  // Cleanup connection before link goes away
     void ioThreadProcessing();       // Called on connection's IO thread by request
     bool tryFailoverLH();            // Called during maintenance visit
-    bool hideManagement() const;
     void reconnectLH(const Address&); //called by LinkRegistry
 
     // connection management (called by LinkRegistry)
@@ -116,7 +114,6 @@ class Link : public PersistableConfig, p
     void closed(int, std::string);   // Called when connection goes away
     void notifyConnectionForced(const std::string text);
     void closeConnection(const std::string& reason);
-    bool pendingConnection(const std::string& host, uint16_t port) const;  // is Link trying to connect to this remote?
 
     friend class LinkRegistry; // to call established, opened, closed
 
@@ -167,7 +164,6 @@ class Link : public PersistableConfig, p
     std::string getPassword()      { return password; }
     Broker* getBroker()       { return broker; }
 
-    void setPassive(bool p);
     bool isConnecting() const { return state == STATE_CONNECTING; }
 
     // PersistableConfig:
@@ -190,10 +186,6 @@ class Link : public PersistableConfig, p
     static const std::string exchangeTypeName;
     static boost::shared_ptr<Exchange> linkExchangeFactory(const std::string& name);
 
-    // replicate internal state of this Link for clustering
-    void getState(framing::FieldTable& state) const;
-    void setState(const framing::FieldTable& state);
-
     /** create a name for a link (if none supplied by user config) */
     static std::string createName(const std::string& transport,
                                   const std::string& host,

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Fri Jan 25 18:20:39 2013
@@ -19,8 +19,10 @@
  *
  */
 #include "qpid/broker/LinkRegistry.h"
-#include "qpid/broker/Link.h"
+
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/broker/Link.h"
 #include "qpid/log/Statement.h"
 #include <iostream>
 #include <boost/format.hpp>
@@ -42,7 +44,7 @@ namespace _qmf = qmf::org::apache::qpid:
 // factored: The persistence element should be factored separately
 LinkRegistry::LinkRegistry () :
     broker(0),
-    parent(0), store(0), passive(false),
+    parent(0), store(0),
     realm("")
 {
 }
@@ -59,7 +61,7 @@ class LinkRegistryConnectionObserver : p
 
 LinkRegistry::LinkRegistry (Broker* _broker) :
     broker(_broker),
-    parent(0), store(0), passive(false),
+    parent(0), store(0),
     realm(broker->getOptions().realm)
 {
     broker->getConnectionObservers().add(
@@ -270,38 +272,6 @@ MessageStore* LinkRegistry::getStore() c
     return store;
 }
 
-namespace {
-    void extractHostPort(const std::string& connId, std::string *host, uint16_t *port)
-    {
-        // Extract host and port of remote broker from connection id string.
-        //
-        // TODO aconway 2011-02-01: centralize code that constructs/parses connection
-        // management IDs. Currently sys:: protocol factories and IO plugins construct the
-        // IDs and LinkRegistry parses them.
-        // KAG: current connection id format assumed:
-        // "localhost:port-remotehost:port".  In the case of IpV6, the host addresses are
-        // contained within brackets "[...]", example:
-        // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided to alert us
-        // if this assumption changes!
-        size_t separator = connId.find('-');
-        assert(separator != std::string::npos);
-        std::string remote = connId.substr(separator+1, std::string::npos);
-        separator = remote.rfind(":");
-        assert(separator != std::string::npos);
-        *host = remote.substr(0, separator);
-        // IPv6 - host is bracketed by "[]", strip them
-        if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') {
-            *host = host->substr(1, host->length() - 2);
-        }
-        try {
-            *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos));
-        } catch (const boost::bad_lexical_cast&) {
-            QPID_LOG(error, "Invalid format for connection identifier! '" << connId << "'");
-            assert(false);
-        }
-    }
-}
-
 /** find the Link that corresponds to the given connection */
 Link::shared_ptr LinkRegistry::findLink(const std::string& connId)
 {
@@ -321,19 +291,15 @@ void LinkRegistry::notifyConnection(cons
     // create a mapping from connection id to link
     QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key );
     std::string host;
-    uint16_t port = 0;
-    extractHostPort( key, &host, &port );
     Link::shared_ptr link;
     {
         Mutex::ScopedLock locker(lock);
-        for (LinkMap::iterator l = pendingLinks.begin(); l != pendingLinks.end(); ++l) {
-            if (l->second->pendingConnection(host, port)) {
-                link = l->second;
-                pendingLinks.erase(l);
-                connections[key] = link->getName();
-                QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName());
-                break;
-            }
+        LinkMap::iterator l = pendingLinks.find(key);
+        if (l != pendingLinks.end()) {
+            link = l->second;
+            pendingLinks.erase(l);
+            connections[key] = link->getName();
+            QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName());
         }
     }
 
@@ -448,26 +414,4 @@ std::string LinkRegistry::getAuthIdentit
     return link->getUsername();
 }
 
-
-void LinkRegistry::setPassive(bool p)
-{
-    Mutex::ScopedLock locker(lock);
-    passive = p;
-    if (passive) { QPID_LOG(info, "Passivating links"); }
-    else { QPID_LOG(info, "Activating links"); }
-    for (LinkMap::iterator i = links.begin(); i != links.end(); i++) {
-        i->second->setPassive(passive);
-    }
-}
-
-void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
-    Mutex::ScopedLock locker(lock);
-    for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second);
-}
-
-void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) {
-    Mutex::ScopedLock locker(lock);
-    for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
-}
-
 }} // namespace qpid::broker

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.h Fri Jan 25 18:20:39 2013
@@ -53,7 +53,6 @@ namespace broker {
         Broker* broker;
         management::Manageable* parent;
         MessageStore* store;
-        bool passive;
         std::string realm;
 
         boost::shared_ptr<Link> findLink(const std::string& key);
@@ -144,20 +143,6 @@ namespace broker {
         QPID_BROKER_EXTERN std::string getPassword        (const std::string& key);
         QPID_BROKER_EXTERN std::string getHost            (const std::string& key);
         QPID_BROKER_EXTERN uint16_t    getPort            (const std::string& key);
-
-        /**
-         * Called to alter passive state. In passive state the links
-         * and bridges managed by a link registry will be recorded and
-         * updated but links won't actually establish connections and
-         * bridges won't therefore pull or push any messages.
-         */
-        QPID_BROKER_EXTERN void setPassive(bool);
-        QPID_BROKER_EXTERN bool isPassive() { return passive; }
-
-        /** Iterate over each link in the registry. Used for cluster updates. */
-        QPID_BROKER_EXTERN void eachLink(boost::function<void(boost::shared_ptr<Link>)> f);
-        /** Iterate over each bridge in the registry. Used for cluster updates. */
-        QPID_BROKER_EXTERN void eachBridge(boost::function<void(boost::shared_ptr< Bridge>)> f);
     };
 }
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Fri Jan 25 18:20:39 2013
@@ -302,19 +302,6 @@ void MessageGroupManager::setDefaults(co
     defaultGroupId = groupId;
 }
 
-/** Cluster replication:
-
-   state map format:
-
-   { "group-state": [ {"name": <group-name>,
-                       "owner": <consumer-name>-or-empty,
-                       "acquired-ct": <acquired count>,
-                       "positions": [Seqnumbers, ... ]},
-                      {...}
-                    ]
-   }
-*/
-
 namespace {
     const std::string GROUP_NAME("name");
     const std::string GROUP_OWNER("owner");
@@ -324,100 +311,3 @@ namespace {
     const std::string GROUP_STATE("group-state");
 }
 
-
-/** Runs on UPDATER to snapshot current state */
-void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
-{
-    using namespace qpid::framing;
-    state.clear();
-    framing::Array groupState(TYPE_CODE_MAP);
-    for (GroupMap::const_iterator g = messageGroups.begin();
-         g != messageGroups.end(); ++g) {
-
-        framing::FieldTable group;
-        group.setString(GROUP_NAME, g->first);
-        group.setString(GROUP_OWNER, g->second.owner);
-        group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
-        framing::Array positions(TYPE_CODE_UINT32);
-        framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
-        for (GroupState::MessageFifo::const_iterator p = g->second.members.begin();
-             p != g->second.members.end(); ++p) {
-            positions.push_back(framing::Array::ValuePtr(new IntegerValue( p->position )));
-            acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( p->acquired )));
-        }
-        group.setArray(GROUP_POSITIONS, positions);
-        group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
-        groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group)));
-    }
-    state.setArray(GROUP_STATE, groupState);
-
-    QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader);
-}
-
-
-/** called on UPDATEE to set state from snapshot */
-void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
-{
-    using namespace qpid::framing;
-    messageGroups.clear();
-    freeGroups.clear();
-    cachedGroup = 0;
-
-    framing::Array groupState(TYPE_CODE_MAP);
-
-    bool ok = state.getArray(GROUP_STATE, groupState);
-    if (!ok) {
-        QPID_LOG(error, "Unable to find message group state information for queue \"" <<
-                 qName << "\": cluster inconsistency error!");
-        return;
-    }
-
-    for (framing::Array::const_iterator g = groupState.begin();
-         g != groupState.end(); ++g) {
-        framing::FieldTable group;
-        ok = framing::getEncodedValue<FieldTable>(*g, group);
-        if (!ok) {
-            QPID_LOG(error, "Invalid message group state information for queue \"" <<
-                     qName << "\": table encoding error!");
-            return;
-        }
-        MessageGroupManager::GroupState state;
-        if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) {
-            QPID_LOG(error, "Invalid message group state information for queue \"" <<
-                     qName << "\": fields missing error!");
-            return;
-        }
-        state.group = group.getAsString(GROUP_NAME);
-        state.owner = group.getAsString(GROUP_OWNER);
-        state.acquired = group.getAsInt(GROUP_ACQUIRED_CT);
-        framing::Array positions(TYPE_CODE_UINT32);
-        ok = group.getArray(GROUP_POSITIONS, positions);
-        if (!ok) {
-            QPID_LOG(error, "Invalid message group state information for queue \"" <<
-                     qName << "\": position encoding error!");
-            return;
-        }
-        framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
-        ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
-        if (!ok || positions.count() != acquiredMsgs.count()) {
-            QPID_LOG(error, "Invalid message group state information for queue \"" <<
-                     qName << "\": acquired flag encoding error!");
-            return;
-        }
-
-        Array::const_iterator a = acquiredMsgs.begin();
-        for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) {
-            GroupState::MessageState mState((*p)->getIntegerValue<uint32_t, 4>());
-            mState.acquired = (*a++)->getIntegerValue<bool>();
-            state.members.push_back(mState);
-        }
-
-        messageGroups[state.group] = state;
-        if (!state.owned()) {
-            assert(state.members.size());
-            freeGroups[state.members.front().position] = &messageGroups[state.group];
-        }
-    }
-
-    QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader)
-}

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageGroupManager.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageGroupManager.h Fri Jan 25 18:20:39 2013
@@ -25,11 +25,12 @@
 /* for managing message grouping on Queues */
 
 #include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/StatefulQueueObserver.h"
+#include "qpid/broker/QueueObserver.h"
 #include "qpid/broker/MessageDistributor.h"
 #include "qpid/framing/SequenceNumber.h"
 #include "qpid/sys/unordered_map.h"
 
+#include "boost/shared_ptr.hpp"
 #include <deque>
 
 namespace qpid {
@@ -39,8 +40,9 @@ class QueueObserver;
 struct QueueSettings;
 class MessageDistributor;
 class Messages;
+class Consumer;
 
-class MessageGroupManager : public StatefulQueueObserver, public MessageDistributor
+class MessageGroupManager : public QueueObserver, public MessageDistributor
 {
     static std::string defaultGroupId;  // assigned if no group id header present
 
@@ -101,10 +103,10 @@ class MessageGroupManager : public State
 
     MessageGroupManager(const std::string& header, const std::string& _qName,
                         Messages& container, unsigned int _timestamp=0 )
-      : StatefulQueueObserver(std::string("MessageGroupManager:") + header),
-      groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName),
-      hits(0), misses(0),
-      lastMsg(0), cachedGroup(0) {}
+      : groupIdHeader( header ), timestamp(_timestamp), messages(container),
+        qName(_qName),
+        hits(0), misses(0),
+        lastMsg(0), cachedGroup(0) {}
     virtual ~MessageGroupManager();
 
     // QueueObserver iface
@@ -114,8 +116,6 @@ class MessageGroupManager : public State
     void dequeued( const Message& qm );
     void consumerAdded( const Consumer& ) {};
     void consumerRemoved( const Consumer& ) {};
-    void getState(qpid::framing::FieldTable& state ) const;
-    void setState(const qpid::framing::FieldTable&);
 
     // MessageDistributor iface
     bool acquire(const std::string& c, Message& );

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageStore.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageStore.h Fri Jan 25 18:20:39 2013
@@ -46,20 +46,6 @@ class MessageStore : public Transactiona
   public:
 
     /**
-     * If called after initialization but before recovery, will discard the database
-     * content and reinitialize as though it were a new installation. If the parameter
-     * saveStoreContent is true, the content of the store will be saved in such a way
-     * that the truncate can be reversed. This is used when cluster nodes recover and
-     *  must get their content from a cluster sync rather than directly from the store.
-     *
-     * @param saveStoreContent If true, will move content of the store to a backup
-     *                         location where they may be restored later if needed. It is
-     *                         not necessary to save more than one prior version of the
-     *                         store.
-     */
-    virtual void truncateInit(const bool saveStoreContent = false) = 0;
-
-    /**
      * Record the existence of a durable queue
      */
     virtual void create(PersistableQueue& queue,

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Fri Jan 25 18:20:39 2013
@@ -42,11 +42,6 @@ MessageStoreModule::~MessageStoreModule(
 
 bool MessageStoreModule::init(const Options*) { return true; }
 
-void MessageStoreModule::truncateInit(const bool pushDownStoreFiles)
-{
-    TRANSFER_EXCEPTION(store->truncateInit(pushDownStoreFiles));
-}
-
 void MessageStoreModule::create(PersistableQueue& queue, const FieldTable& args)
 {
     TRANSFER_EXCEPTION(store->create(queue, args));

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/MessageStoreModule.h Fri Jan 25 18:20:39 2013
@@ -41,7 +41,6 @@ class MessageStoreModule : public Messag
     MessageStoreModule(boost::shared_ptr<MessageStore>& store);
 
     bool init(const Options* options);
-    void truncateInit(const bool pushDownStoreFiles = false);
     std::auto_ptr<TransactionContext> begin();
     std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
     void prepare(TPCTransactionContext& txn);

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Messages.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Messages.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Messages.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Messages.h Fri Jan 25 18:20:39 2013
@@ -91,13 +91,6 @@ class Messages
     virtual Message* find(const QueueCursor&) = 0;
 
     /**
-     * Add an already acquired message to the queue.
-     * Used by a cluster updatee to replicate acquired messages from the updater.
-     * Only need be implemented by subclasses that keep track of
-     * acquired messages.
-     */
-    //virtual void updateAcquired(const QueuedMessage&) { }
-    /**
      * Apply, the functor to each message held
      */
     virtual void foreach(Functor) = 0;

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Fri Jan 25 18:20:39 2013
@@ -52,8 +52,6 @@ NullMessageStore::NullMessageStore() : n
 
 bool NullMessageStore::init(const Options* /*options*/) {return true;}
 
-void NullMessageStore::truncateInit(const bool /*pushDownStoreFiles*/) {}
-
 void NullMessageStore::create(PersistableQueue& queue, const framing::FieldTable& /*args*/)
 {
     queue.setPersistenceId(nextPersistenceId++);

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/NullMessageStore.h Fri Jan 25 18:20:39 2013
@@ -44,7 +44,6 @@ class QPID_BROKER_CLASS_EXTERN NullMessa
     QPID_BROKER_EXTERN NullMessageStore();
 
     QPID_BROKER_EXTERN virtual bool init(const Options* options);
-    QPID_BROKER_EXTERN virtual void truncateInit(const bool pushDownStoreFiles = false);
     QPID_BROKER_EXTERN virtual std::auto_ptr<TransactionContext> begin();
     QPID_BROKER_EXTERN virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
     QPID_BROKER_EXTERN virtual void prepare(TPCTransactionContext& txn);

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp Fri Jan 25 18:20:39 2013
@@ -45,6 +45,7 @@
 #include "qpid/framing/FieldValue.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/types/Variant.h"
 #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
 #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
@@ -237,9 +238,6 @@ void Queue::deliver(Message msg, TxBuffe
     //'link' for whatever protocol is used; that would let protocol
     //specific stuff be kept out the queue
 
-    // Check for deferred delivery in a cluster.
-    if (broker && broker->deferDelivery(name, msg))
-        return;
     if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) {
         if (alternateExchange) {
             DeliverableMessage deliverable(msg, 0);
@@ -1152,6 +1150,7 @@ Queue::shared_ptr Queue::restore( QueueR
 void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange)
 {
     alternateExchange = exchange;
+    alternateExchange->incAlternateUsers();
     if (mgmtObject) {
         if (exchange.get() != 0)
             mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId());
@@ -1201,7 +1200,7 @@ void Queue::tryAutoDelete(Broker& broker
     if (queue->settings.autoDeleteDelay && queue->canAutoDelete()) {
         AbsTime time(now(), Duration(queue->settings.autoDeleteDelay * TIME_SEC));
         queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time));
-        broker.getClusterTimer().add(queue->autoDeleteTask);
+        broker.getTimer().add(queue->autoDeleteTask);
         QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
     } else {
         tryAutoDeleteImpl(broker, queue, connectionId, userId);
@@ -1431,15 +1430,6 @@ void Queue::observeEnqueue(const Message
     mgntEnqStats(m, mgmtObject, brokerMgmtObject);
 }
 
-// Note: accessing listeners outside of lock is dangerous.  Caller must ensure the queue's
-// state is not changed while listeners is referenced.
-QueueListeners& Queue::getListeners() { return listeners; }
-
-// Note: accessing messages outside of lock is dangerous.  Caller must ensure the queue's
-// state is not changed while messages is referenced.
-Messages& Queue::getMessages() { return *messages; }
-const Messages& Queue::getMessages() const { return *messages; }
-
 bool Queue::checkNotDeleted(const Consumer::shared_ptr& c)
 {
     if (deleted && !c->hideDeletedError())

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h Fri Jan 25 18:20:39 2013
@@ -38,7 +38,6 @@
 #include "qpid/framing/SequenceNumber.h"
 #include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Monitor.h"
-#include "qpid/sys/Timer.h"
 #include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Queue.h"
 #include "qmf/org/apache/qpid/broker/Broker.h"
@@ -56,6 +55,9 @@
 #include <algorithm>
 
 namespace qpid {
+namespace sys {
+class TimerTask;
+}
 namespace broker {
 class Broker;
 class Exchange;
@@ -370,7 +372,7 @@ class Queue : public boost::enable_share
      *
      * The _caller_ must ensure that any messages after pos have been dequeued.
      *
-     * Used by HA/cluster code for queue replication.
+     * Used by HA code for queue replication.
      */
     QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos);
 
@@ -402,11 +404,6 @@ class Queue : public boost::enable_share
      */
     QPID_BROKER_EXTERN void recoveryComplete(ExchangeRegistry& exchanges);
 
-    // For cluster update
-    QPID_BROKER_EXTERN QueueListeners& getListeners();
-    QPID_BROKER_EXTERN Messages& getMessages();
-    QPID_BROKER_EXTERN const Messages& getMessages() const;
-
     /**
      * Reserve space in policy for an enqueued message that
      * has been recovered in the prepared state (dtx only)

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueCleaner.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueCleaner.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueCleaner.cpp Fri Jan 25 18:20:39 2013
@@ -18,15 +18,36 @@
  * under the License.
  *
  */
-#include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueCleaner.h"
 
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/sys/Timer.h"
+
+#include <boost/function.hpp>
 #include <boost/bind.hpp>
 
 namespace qpid {
 namespace broker {
 
+namespace {
+    typedef boost::function0<void> FireFunction;
+    class Task : public sys::TimerTask
+    {
+    public:
+        Task(FireFunction f, sys::Duration duration);
+        void fire();
+    private:
+        FireFunction fireFunction;
+    };
+
+    Task::Task(FireFunction f, qpid::sys::Duration d) : sys::TimerTask(d,"QueueCleaner"), fireFunction(f) {}
+
+    void Task::fire()
+    {
+        fireFunction();
+    }
+}
 QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {}
 
 QueueCleaner::~QueueCleaner()
@@ -37,7 +58,7 @@ QueueCleaner::~QueueCleaner()
 void QueueCleaner::start(qpid::sys::Duration p)
 {
     period = p;
-    task = new Task(*this, p);
+    task = new Task(boost::bind(&QueueCleaner::fired, this), p);
     timer->add(task);
 }
 
@@ -45,14 +66,6 @@ void QueueCleaner::setTimer(qpid::sys::T
     this->timer = timer;
 }
 
-
-QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d,"QueueCleaner"), parent(p) {}
-
-void QueueCleaner::Task::fire()
-{
-    parent.fired();
-}
-
 namespace {
 struct CollectQueues
 {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueCleaner.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueCleaner.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueCleaner.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueCleaner.h Fri Jan 25 18:20:39 2013
@@ -23,9 +23,17 @@
  */
 
 #include "qpid/broker/BrokerImportExport.h"
-#include "qpid/sys/Timer.h"
+#include "qpid/sys/Time.h"
+
+#include <boost/intrusive_ptr.hpp>
 
 namespace qpid {
+
+namespace sys {
+    class Timer;
+    class TimerTask;
+}
+
 namespace broker {
 
 class QueueRegistry;
@@ -39,16 +47,8 @@ class QueueCleaner
     QPID_BROKER_EXTERN ~QueueCleaner();
     QPID_BROKER_EXTERN void start(sys::Duration period);
     QPID_BROKER_EXTERN void setTimer(sys::Timer* timer);
-  private:
-    class Task : public sys::TimerTask
-    {
-      public:
-        Task(QueueCleaner& parent, sys::Duration duration);
-        void fire();
-      private:
-        QueueCleaner& parent;
-    };
 
+  private:
     boost::intrusive_ptr<sys::TimerTask> task;
     QueueRegistry& queues;
     sys::Timer* timer;

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Fri Jan 25 18:20:39 2013
@@ -65,7 +65,7 @@ namespace {
 QueueFlowLimit::QueueFlowLimit(Queue *_queue,
                                uint32_t _flowStopCount, uint32_t _flowResumeCount,
                                uint64_t _flowStopSize,  uint64_t _flowResumeSize)
-    : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"),
+    : queue(_queue), queueName("<unknown>"),
       flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
       flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
       flowStopped(false), count(0), size(0), broker(0)
@@ -129,11 +129,6 @@ void QueueFlowLimit::enqueued(const Mess
     }
 
     if (flowStopped || !index.empty()) {
-        // ignore flow control if we are populating the queue due to cluster replication:
-        if (broker && broker->isClusterUpdatee()) {
-            QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.getSequence());
-            return;
-        }
         QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.getSequence());
         msg.getPersistentContext()->getIngressCompletion().startCompleter();    // don't complete until flow resumes
         bool unique;
@@ -296,79 +291,8 @@ QueueFlowLimit *QueueFlowLimit::createLi
     return 0;
 }
 
-/* Cluster replication */
-
-namespace {
-    /** pack a set of sequence number ranges into a framing::Array */
-    void buildSeqRangeArray(qpid::framing::Array *seqs,
-                            const qpid::framing::SequenceNumber& first,
-                            const qpid::framing::SequenceNumber& last)
-    {
-        seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(first)));
-        seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(last)));
-    }
-}
-
-/** Runs on UPDATER to snapshot current state */
-void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const
-{
-    sys::Mutex::ScopedLock l(indexLock);
-    state.clear();
-
-    framing::SequenceSet ss;
-    if (!index.empty()) {
-        /* replicate the set of messages pending flow control */
-        for (std::map<framing::SequenceNumber, Message >::const_iterator itr = index.begin();
-             itr != index.end(); ++itr) {
-            ss.add(itr->first);
-        }
-        framing::Array seqs(TYPE_CODE_UINT32);
-        typedef boost::function<void(framing::SequenceNumber, framing::SequenceNumber)> arrayBuilder;
-        ss.for_each((arrayBuilder)boost::bind(&buildSeqRangeArray, &seqs, _1, _2));
-        state.setArray("pendingMsgSeqs", seqs);
-    }
-    QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss);
-}
-
-
-/** called on UPDATEE to set state from snapshot */
-void QueueFlowLimit::setState(const qpid::framing::FieldTable& state)
-{
-    sys::Mutex::ScopedLock l(indexLock);
-    index.clear();
-
-    framing::SequenceSet fcmsg;
-    framing::Array seqArray(TYPE_CODE_UINT32);
-    if (state.getArray("pendingMsgSeqs", seqArray)) {
-        assert((seqArray.count() & 0x01) == 0); // must be even since they are sequence ranges
-        framing::Array::const_iterator i = seqArray.begin();
-        while (i != seqArray.end()) {
-            framing::SequenceNumber first((*i)->getIntegerValue<uint32_t, 4>());
-            ++i;
-            framing::SequenceNumber last((*i)->getIntegerValue<uint32_t, 4>());
-            ++i;
-            fcmsg.add(first, last);
-            for (SequenceNumber seq = first; seq <= last; ++seq) {
-                Message msg;
-                queue->find(seq, msg);   // fyi: may not be found if msg is acquired & unacked
-                bool unique;
-                unique = index.insert(std::pair<framing::SequenceNumber, Message >(seq, msg)).second;
-                // Like this to avoid tripping up unused variable warning when NDEBUG set
-                if (!unique) assert(unique);
-            }
-        }
-    }
-
-    flowStopped = index.size() != 0;
-    if (queueMgmtObj) {
-        queueMgmtObj->set_flowStopped(isFlowControlActive());
-    }
-    QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the pending msgs, range=" << fcmsg)
-}
-
-
 namespace qpid {
-    namespace broker {
+namespace broker {
 
 std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f)
 {
@@ -377,6 +301,6 @@ std::ostream& operator<<(std::ostream& o
     return out;
 }
 
-    }
+}
 }
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Fri Jan 25 18:20:39 2013
@@ -26,7 +26,7 @@
 #include <iostream>
 #include <memory>
 #include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/StatefulQueueObserver.h"
+#include "qpid/broker/QueueObserver.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/SequenceNumber.h"
 #include "qpid/sys/AtomicValue.h"
@@ -40,6 +40,7 @@ namespace broker {
 
 class Broker;
 class Queue;
+class Message;
 struct QueueSettings;
 
 /**
@@ -49,7 +50,7 @@ struct QueueSettings;
  * passing _either_ level may turn flow control ON, but _both_ must be
  * below level before flow control will be turned OFF.
  */
- class QueueFlowLimit : public StatefulQueueObserver
+ class QueueFlowLimit : public QueueObserver
 {
     static uint64_t defaultMaxSize;
     static uint defaultFlowStopRatio;
@@ -84,10 +85,6 @@ struct QueueSettings;
     QPID_BROKER_EXTERN void acquired(const Message&) {};
     QPID_BROKER_EXTERN void requeued(const Message&) {};
 
-    /** for clustering: */
-    QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const;
-    QPID_BROKER_EXTERN void setState(const qpid::framing::FieldTable&);
-
     uint32_t getFlowStopCount() const { return flowStopCount; }
     uint32_t getFlowResumeCount() const { return flowResumeCount; }
     uint64_t getFlowStopSize() const { return flowStopSize; }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Fri Jan 25 18:20:39 2013
@@ -60,10 +60,8 @@ QueueRegistry::declare(const string& nam
         if (i == queues.end()) {
             Queue::shared_ptr queue = create(name, settings);
             //Move this to factory also?
-            if (alternate) {
+            if (alternate)
                 queue->setAlternateExchange(alternate);//need to do this *before* create
-                alternate->incAlternateUsers();
-            }
             if (!recovering) {
                 //create persistent record if required
                 queue->create();

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp Fri Jan 25 18:20:39 2013
@@ -24,6 +24,7 @@
 #endif
 
 #include "qpid/broker/AclModule.h"
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -169,14 +170,8 @@ void SaslAuthenticator::fini(void)
 std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c )
 {
     if (c.getBroker().getOptions().auth) {
-        // The cluster creates non-authenticated connections for internal shadow connections
-        // that are never connected to an external client.
-        if ( !c.isAuthenticated() )
-            return std::auto_ptr<SaslAuthenticator>(
-                new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
-        else
-            return std::auto_ptr<SaslAuthenticator>(
-                new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
+        return std::auto_ptr<SaslAuthenticator>(
+            new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
     } else {
         QPID_LOG(debug, "SASL: No Authentication Performed");
         return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted));

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp Fri Jan 25 18:20:39 2013
@@ -19,12 +19,14 @@
  *
  */
 #include "qpid/broker/SecureConnectionFactory.h"
-#include "qpid/framing/ProtocolVersion.h"
+
 #include "qpid/amqp_0_10/Connection.h"
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/SecureConnection.h"
-#include "qpid/sys/SecuritySettings.h"
+#include "qpid/framing/ProtocolVersion.h"
 #include "qpid/log/Statement.h"
+#include "qpid/sys/SecuritySettings.h"
 
 namespace qpid {
 namespace broker {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Jan 25 18:20:39 2013
@@ -20,6 +20,8 @@
  */
 
 #include "qpid/broker/SessionState.h"
+
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/DeliverableMessage.h"
 #include "qpid/broker/DtxAck.h"
@@ -283,7 +285,7 @@ void SemanticState::record(const Deliver
 
 const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 
-SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
+SemanticStateConsumerImpl::SemanticStateConsumerImpl(SemanticState* _parent,
                                           const string& _name,
                                           Queue::shared_ptr _queue,
                                           bool ack,
@@ -326,12 +328,12 @@ Consumer(_name, type),
     }
 }
 
-ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObject (void) const
+ManagementObject::shared_ptr SemanticStateConsumerImpl::GetManagementObject (void) const
 {
     return mgmtObject;
 }
 
-Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&)
+Manageable::status_t SemanticStateConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&)
 {
     Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
 
@@ -341,16 +343,16 @@ Manageable::status_t SemanticState::Cons
 }
 
 
-OwnershipToken* SemanticState::ConsumerImpl::getSession()
+OwnershipToken* SemanticStateConsumerImpl::getSession()
 {
     return &(parent->session);
 }
 
-bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg)
+bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg)
 {
     return deliver(cursor, msg, shared_from_this());
 }
-bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
+bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
 {
     allocateCredit(msg);
     boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
@@ -375,12 +377,12 @@ bool SemanticState::ConsumerImpl::delive
     return true;
 }
 
-bool SemanticState::ConsumerImpl::filter(const Message&)
+bool SemanticStateConsumerImpl::filter(const Message&)
 {
     return true;
 }
 
-bool SemanticState::ConsumerImpl::accept(const Message& msg)
+bool SemanticStateConsumerImpl::accept(const Message& msg)
 {
     // TODO aconway 2009-06-08: if we have byte & message credit but
     // checkCredit fails because the message is to big, we should
@@ -393,8 +395,8 @@ bool SemanticState::ConsumerImpl::accept
 
 namespace {
 struct ConsumerName {
-    const SemanticState::ConsumerImpl& consumer;
-    ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {}
+    const SemanticStateConsumerImpl& consumer;
+    ConsumerName(const SemanticStateConsumerImpl& ci) : consumer(ci) {}
 };
 
 ostream& operator<<(ostream& o, const ConsumerName& pc) {
@@ -403,7 +405,7 @@ ostream& operator<<(ostream& o, const Co
 }
 }
 
-void SemanticState::ConsumerImpl::allocateCredit(const Message& msg)
+void SemanticStateConsumerImpl::allocateCredit(const Message& msg)
 {
     Credit original = credit;
     boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
@@ -413,7 +415,7 @@ void SemanticState::ConsumerImpl::alloca
 
 }
 
-bool SemanticState::ConsumerImpl::checkCredit(const Message& msg)
+bool SemanticStateConsumerImpl::checkCredit(const Message& msg)
 {
     boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
     bool enoughCredit = credit.check(1, transfer->getRequiredCredit());
@@ -423,7 +425,7 @@ bool SemanticState::ConsumerImpl::checkC
     return enoughCredit;
 }
 
-SemanticState::ConsumerImpl::~ConsumerImpl()
+SemanticStateConsumerImpl::~SemanticStateConsumerImpl()
 {
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
@@ -496,7 +498,7 @@ void SemanticState::requestDispatch()
         i->second->requestDispatch();
 }
 
-void SemanticState::ConsumerImpl::requestDispatch()
+void SemanticStateConsumerImpl::requestDispatch()
 {
     if (blocked) {
         parent->session.getConnection().outputTasks.addOutputTask(this);
@@ -514,7 +516,7 @@ bool SemanticState::complete(DeliveryRec
     return delivery.isRedundant();
 }
 
-void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
+void SemanticStateConsumerImpl::complete(DeliveryRecord& delivery)
 {
     if (!delivery.isComplete()) {
         delivery.complete();
@@ -539,7 +541,7 @@ SessionContext& SemanticState::getSessio
 const SessionContext& SemanticState::getSession() const { return session; }
 
 
-const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
+const SemanticStateConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
 {
     ConsumerImpl::shared_ptr consumer;
     if (!find(destination, consumer)) {
@@ -596,7 +598,7 @@ void SemanticState::stop(const std::stri
     find(destination)->stop();
 }
 
-void SemanticState::ConsumerImpl::setWindowMode()
+void SemanticStateConsumerImpl::setWindowMode()
 {
     credit.setWindowMode(true);
     if (mgmtObject){
@@ -604,7 +606,7 @@ void SemanticState::ConsumerImpl::setWin
     }
 }
 
-void SemanticState::ConsumerImpl::setCreditMode()
+void SemanticStateConsumerImpl::setCreditMode()
 {
     credit.setWindowMode(false);
     if (mgmtObject){
@@ -612,17 +614,17 @@ void SemanticState::ConsumerImpl::setCre
     }
 }
 
-void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
+void SemanticStateConsumerImpl::addByteCredit(uint32_t value)
 {
     credit.addByteCredit(value);
 }
 
-void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
+void SemanticStateConsumerImpl::addMessageCredit(uint32_t value)
 {
     credit.addMessageCredit(value);
 }
 
-bool SemanticState::ConsumerImpl::haveCredit()
+bool SemanticStateConsumerImpl::haveCredit()
 {
     if (credit) {
         return true;
@@ -632,19 +634,19 @@ bool SemanticState::ConsumerImpl::haveCr
     }
 }
 
-bool SemanticState::ConsumerImpl::doDispatch()
+bool SemanticStateConsumerImpl::doDispatch()
 {
     return queue->dispatch(shared_from_this());
 }
 
-void SemanticState::ConsumerImpl::flush()
+void SemanticStateConsumerImpl::flush()
 {
     while(haveCredit() && doDispatch())
         ;
     credit.cancel();
 }
 
-void SemanticState::ConsumerImpl::stop()
+void SemanticStateConsumerImpl::stop()
 {
     credit.cancel();
 }
@@ -699,7 +701,7 @@ void SemanticState::reject(DeliveryId fi
     getSession().setUnackedCount(unacked.size());
 }
 
-bool SemanticState::ConsumerImpl::doOutput()
+bool SemanticStateConsumerImpl::doOutput()
 {
     try {
         return haveCredit() && doDispatch();
@@ -708,24 +710,24 @@ bool SemanticState::ConsumerImpl::doOutp
     }
 }
 
-void SemanticState::ConsumerImpl::enableNotify()
+void SemanticStateConsumerImpl::enableNotify()
 {
     Mutex::ScopedLock l(lock);
     notifyEnabled = true;
 }
 
-void SemanticState::ConsumerImpl::disableNotify()
+void SemanticStateConsumerImpl::disableNotify()
 {
     Mutex::ScopedLock l(lock);
     notifyEnabled = false;
 }
 
-bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
+bool SemanticStateConsumerImpl::isNotifyEnabled() const {
     Mutex::ScopedLock l(lock);
     return notifyEnabled;
 }
 
-void SemanticState::ConsumerImpl::notify()
+void SemanticStateConsumerImpl::notify()
 {
     Mutex::ScopedLock l(lock);
     if (notifyEnabled) {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h Fri Jan 25 18:20:39 2013
@@ -76,105 +76,16 @@ class SessionState;
  * called when a client's socket is ready to write data.
  *
  */
+class SemanticStateConsumerImpl;
 class SemanticState : private boost::noncopyable {
-  public:
-    class ConsumerImpl : public Consumer, public sys::OutputTask,
-                         public boost::enable_shared_from_this<ConsumerImpl>,
-                         public management::Manageable
-    {
-      protected:
-        mutable qpid::sys::Mutex lock;
-        SemanticState* const parent;
-      private:
-        const boost::shared_ptr<Queue> queue;
-        const bool ackExpected;
-        const bool acquire;
-        bool blocked;
-        bool exclusive;
-        std::string resumeId;
-        const std::string tag;  // <destination> from AMQP 0-10 Message.subscribe command
-        uint64_t resumeTtl;
-        framing::FieldTable arguments;
-        Credit credit;
-        bool notifyEnabled;
-        const int syncFrequency;
-        int deliveryCount;
-        qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject;
-        ProtocolRegistry& protocols;
-
-        bool checkCredit(const Message& msg);
-        void allocateCredit(const Message& msg);
-        bool haveCredit();
-
-      protected:
-        QPID_BROKER_EXTERN virtual bool doDispatch();
-        size_t unacked() { return parent->unacked.size(); }
-        QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&, boost::shared_ptr<Consumer>);
-
-      public:
-        typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
-
-        QPID_BROKER_EXTERN ConsumerImpl(SemanticState* parent,
-                     const std::string& name, boost::shared_ptr<Queue> queue,
-                     bool ack, SubscriptionType type, bool exclusive,
-                     const std::string& tag, const std::string& resumeId,
-                     uint64_t resumeTtl, const framing::FieldTable& arguments);
-        QPID_BROKER_EXTERN ~ConsumerImpl();
-        QPID_BROKER_EXTERN OwnershipToken* getSession();
-        QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&);
-        QPID_BROKER_EXTERN bool filter(const Message&);
-        QPID_BROKER_EXTERN bool accept(const Message&);
-        QPID_BROKER_EXTERN void cancel() {}
-
-        QPID_BROKER_EXTERN void disableNotify();
-        QPID_BROKER_EXTERN void enableNotify();
-        QPID_BROKER_EXTERN void notify();
-        QPID_BROKER_EXTERN bool isNotifyEnabled() const;
-
-        QPID_BROKER_EXTERN void requestDispatch();
-
-        QPID_BROKER_EXTERN void setWindowMode();
-        QPID_BROKER_EXTERN void setCreditMode();
-        QPID_BROKER_EXTERN void addByteCredit(uint32_t value);
-        QPID_BROKER_EXTERN void addMessageCredit(uint32_t value);
-        QPID_BROKER_EXTERN void flush();
-        QPID_BROKER_EXTERN void stop();
-        QPID_BROKER_EXTERN void complete(DeliveryRecord&);
-        boost::shared_ptr<Queue> getQueue() const { return queue; }
-        bool isBlocked() const { return blocked; }
-        bool setBlocked(bool set) { std::swap(set, blocked); return set; }
-
-        QPID_BROKER_EXTERN bool doOutput();
-
-        Credit& getCredit() { return credit; }
-        const Credit& getCredit() const { return credit; }
-        bool isAckExpected() const { return ackExpected; }
-        bool isAcquire() const { return acquire; }
-        bool isExclusive() const { return exclusive; }
-        std::string getResumeId() const { return resumeId; };
-        const std::string& getTag() const { return tag; }
-        uint64_t getResumeTtl() const { return resumeTtl; }
-	uint32_t getDeliveryCount() const { return deliveryCount; }
-	void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; }
-        const framing::FieldTable& getArguments() const { return arguments; }
-
-        SemanticState& getParent() { return *parent; }
-        const SemanticState& getParent() const { return *parent; }
-
-        void acknowledged(const DeliveryRecord&) {}
-
-        // manageable entry points
-        QPID_BROKER_EXTERN management::ManagementObject::shared_ptr
-        GetManagementObject(void) const;
-
-        QPID_BROKER_EXTERN management::Manageable::status_t
-        ManagementMethod(uint32_t methodId, management::Args& args, std::string& text);
-    };
+  friend class SemanticStateConsumerImpl;
 
+  public:
+    typedef SemanticStateConsumerImpl ConsumerImpl;
     typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
 
   private:
-    typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
+    typedef std::map<std::string, boost::shared_ptr<ConsumerImpl> > ConsumerImplMap;
     typedef boost::tuple<std::string, std::string, std::string, std::string> Binding;
     typedef std::set<Binding> Bindings;
 
@@ -201,8 +112,8 @@ class SemanticState : private boost::non
     bool complete(DeliveryRecord&);
     AckRange findRange(DeliveryId first, DeliveryId last);
     void requestDispatch();
-    void cancel(ConsumerImpl::shared_ptr);
-    void disable(ConsumerImpl::shared_ptr);
+    void cancel(boost::shared_ptr<ConsumerImpl>);
+    void disable(boost::shared_ptr<ConsumerImpl>);
     void unbindSessionBindings();
 
   public:
@@ -213,8 +124,8 @@ class SemanticState : private boost::non
     SessionContext& getSession();
     const SessionContext& getSession() const;
 
-    const ConsumerImpl::shared_ptr find(const std::string& destination) const;
-    bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
+    const boost::shared_ptr<ConsumerImpl> find(const std::string& destination) const;
+    bool find(const std::string& destination, boost::shared_ptr<ConsumerImpl>&) const;
 
     /**
      * Get named queue, never returns 0.
@@ -264,11 +175,6 @@ class SemanticState : private boost::non
     void detached();
     void closed();
 
-    // Used by cluster to re-create sessions
-    template <class F> void eachConsumer(F f) {
-        for(ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); ++i)
-            f(i->second);
-    }
     DeliveryRecords& getUnacked() { return unacked; }
     framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
     TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
@@ -285,6 +191,99 @@ class SemanticState : private boost::non
                       const std::string& routingKey);
 };
 
+class SemanticStateConsumerImpl : public Consumer, public sys::OutputTask,
+                        public boost::enable_shared_from_this<SemanticStateConsumerImpl>,
+                        public management::Manageable
+{
+    protected:
+    mutable qpid::sys::Mutex lock;
+    SemanticState* const parent;
+    private:
+    const boost::shared_ptr<Queue> queue;
+    const bool ackExpected;
+    const bool acquire;
+    bool blocked;
+    bool exclusive;
+    std::string resumeId;
+    const std::string tag;  // <destination> from AMQP 0-10 Message.subscribe command
+    uint64_t resumeTtl;
+    framing::FieldTable arguments;
+    Credit credit;
+    bool notifyEnabled;
+    const int syncFrequency;
+    int deliveryCount;
+    qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject;
+    ProtocolRegistry& protocols;
+
+    bool checkCredit(const Message& msg);
+    void allocateCredit(const Message& msg);
+    bool haveCredit();
+
+    protected:
+    QPID_BROKER_EXTERN virtual bool doDispatch();
+    size_t unacked() { return parent->unacked.size(); }
+    QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&, boost::shared_ptr<Consumer>);
+
+    public:
+    typedef boost::shared_ptr<SemanticStateConsumerImpl> shared_ptr;
+
+    QPID_BROKER_EXTERN SemanticStateConsumerImpl(SemanticState* parent,
+                    const std::string& name, boost::shared_ptr<Queue> queue,
+                    bool ack, SubscriptionType type, bool exclusive,
+                    const std::string& tag, const std::string& resumeId,
+                    uint64_t resumeTtl, const framing::FieldTable& arguments);
+    QPID_BROKER_EXTERN ~SemanticStateConsumerImpl();
+    QPID_BROKER_EXTERN OwnershipToken* getSession();
+    QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&);
+    QPID_BROKER_EXTERN bool filter(const Message&);
+    QPID_BROKER_EXTERN bool accept(const Message&);
+    QPID_BROKER_EXTERN void cancel() {}
+
+    QPID_BROKER_EXTERN void disableNotify();
+    QPID_BROKER_EXTERN void enableNotify();
+    QPID_BROKER_EXTERN void notify();
+    QPID_BROKER_EXTERN bool isNotifyEnabled() const;
+
+    QPID_BROKER_EXTERN void requestDispatch();
+
+    QPID_BROKER_EXTERN void setWindowMode();
+    QPID_BROKER_EXTERN void setCreditMode();
+    QPID_BROKER_EXTERN void addByteCredit(uint32_t value);
+    QPID_BROKER_EXTERN void addMessageCredit(uint32_t value);
+    QPID_BROKER_EXTERN void flush();
+    QPID_BROKER_EXTERN void stop();
+    QPID_BROKER_EXTERN void complete(DeliveryRecord&);
+    boost::shared_ptr<Queue> getQueue() const { return queue; }
+    bool isBlocked() const { return blocked; }
+    bool setBlocked(bool set) { std::swap(set, blocked); return set; }
+
+    QPID_BROKER_EXTERN bool doOutput();
+
+    Credit& getCredit() { return credit; }
+    const Credit& getCredit() const { return credit; }
+    bool isAckExpected() const { return ackExpected; }
+    bool isAcquire() const { return acquire; }
+    bool isExclusive() const { return exclusive; }
+    std::string getResumeId() const { return resumeId; };
+    const std::string& getTag() const { return tag; }
+    uint64_t getResumeTtl() const { return resumeTtl; }
+    uint32_t getDeliveryCount() const { return deliveryCount; }
+    void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; }
+    const framing::FieldTable& getArguments() const { return arguments; }
+
+    SemanticState& getParent() { return *parent; }
+    const SemanticState& getParent() const { return *parent; }
+
+    void acknowledged(const DeliveryRecord&) {}
+
+    // manageable entry points
+    QPID_BROKER_EXTERN management::ManagementObject::shared_ptr
+    GetManagementObject(void) const;
+
+    QPID_BROKER_EXTERN management::Manageable::status_t
+    ManagementMethod(uint32_t methodId, management::Args& args, std::string& text);
+};
+
 }} // namespace qpid::broker
 
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Fri Jan 25 18:20:39 2013
@@ -16,7 +16,10 @@
  *
  */
 #include "qpid/broker/SessionAdapter.h"
+
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/broker/DtxTimeout.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/Exception.h"
 #include "qpid/framing/reply_exceptions.h"

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionHandler.cpp Fri Jan 25 18:20:39 2013
@@ -19,8 +19,9 @@
  */
 
 #include "qpid/broker/SessionHandler.h"
-#include "qpid/broker/SessionState.h"
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/broker/SessionState.h"
 #include "qpid/log/Statement.h"
 
 #include <boost/bind.hpp>
@@ -34,9 +35,7 @@ using namespace qpid::sys;
 SessionHandler::SessionHandler(Connection& c, ChannelId ch)
     : qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch),
       connection(c),
-      proxy(out),
-      clusterOrderProxy(c.getClusterOrderOutput() ?
-                        new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
+      proxy(out)
 {}
 
 SessionHandler::~SessionHandler() {}
@@ -110,10 +109,7 @@ void SessionHandler::attachAs(const std:
 {
     SessionId id(connection.getUserId(), name);
     SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
-    // Delay creating management object till attached(). In a cluster,
-    // only the active link broker calls attachAs but all brokers
-    // receive the subsequent attached() call.
-    session.reset(new SessionState(connection.getBroker(), *this, id, config, true));
+    session.reset(new SessionState(connection.getBroker(), *this, id, config));
     sendAttach(false);
 }
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionHandler.h Fri Jan 25 18:20:39 2013
@@ -71,17 +71,6 @@ class SessionHandler : public qpid::amqp
     framing::AMQP_ClientProxy& getProxy() { return proxy; }
     const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
 
-    /**
-     * If commands are sent based on the local time (e.g. in timers), they don't have
-     * a well-defined ordering across cluster nodes.
-     * This proxy is for sending such commands. In a clustered broker it will take steps
-     * to synchronize command order across the cluster. In a stand-alone broker
-     * it is just a synonym for getProxy()
-     */
-    framing::AMQP_ClientProxy& getClusterOrderProxy() {
-        return clusterOrderProxy.get() ? *clusterOrderProxy : proxy;
-    }
-
     virtual void handleDetach();
     void attached(const std::string& name);//used by 'pushing' inter-broker bridges
     void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges
@@ -108,7 +97,6 @@ class SessionHandler : public qpid::amqp
     Connection& connection;
     framing::AMQP_ClientProxy proxy;
     std::auto_ptr<SessionState> session;
-    std::auto_ptr<SetChannelProxy> clusterOrderProxy;
     boost::shared_ptr<ErrorListener> errorListener;
 };
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org