You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2013/01/25 19:20:49 UTC

svn commit: r1438629 [5/10] - in /qpid/branches/java-broker-config-qpid-4390: ./ qpid/ qpid/bin/ qpid/cpp/ qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf2/ qpid/cpp/bindings/qpid/ qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/t/ qpid/cpp/bindi...

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.cpp Fri Jan 25 18:20:39 2013
@@ -19,6 +19,12 @@
  *
  */
 #include "Membership.h"
+#include "HaBroker.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
+#include "qmf/org/apache/qpid/ha/HaBroker.h"
 #include <boost/bind.hpp>
 #include <iostream>
 #include <iterator>
@@ -26,37 +32,57 @@
 namespace qpid {
 namespace ha {
 
+namespace _qmf = ::qmf::org::apache::qpid::ha;
 
-void Membership::reset(const BrokerInfo& b) {
+using sys::Mutex;
+using types::Variant;
+
+Membership::Membership(const BrokerInfo& info, HaBroker& b)
+    : haBroker(b), self(info.getSystemId())
+{
+    brokers[self] = info;
+}
+
+void Membership::clear() {
+    Mutex::ScopedLock l(lock);
+    BrokerInfo me = brokers[self];
     brokers.clear();
-    brokers[b.getSystemId()] = b;
+    brokers[self] = me;
 }
 
 void Membership::add(const BrokerInfo& b) {
+    Mutex::ScopedLock l(lock);
     brokers[b.getSystemId()] = b;
+    update(l);
 }
 
 
 void Membership::remove(const types::Uuid& id) {
+    Mutex::ScopedLock l(lock);
     BrokerInfo::Map::iterator i = brokers.find(id);
     if (i != brokers.end()) {
         brokers.erase(i);
-        }
+        update(l);
+    }
 }
 
 bool Membership::contains(const types::Uuid& id) {
+    Mutex::ScopedLock l(lock);
     return brokers.find(id) != brokers.end();
 }
 
 void Membership::assign(const types::Variant::List& list) {
+    Mutex::ScopedLock l(lock);
     brokers.clear();
     for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
         BrokerInfo b(i->asMap());
         brokers[b.getSystemId()] = b;
     }
+    update(l);
 }
 
 types::Variant::List Membership::asList() const {
+    Mutex::ScopedLock l(lock);
     types::Variant::List list;
     for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
         list.push_back(i->second.asMap());
@@ -64,6 +90,7 @@ types::Variant::List Membership::asList(
 }
 
 BrokerInfo::Set Membership::otherBackups() const {
+    Mutex::ScopedLock l(lock);
     BrokerInfo::Set result;
     for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
         if (i->second.getStatus() == READY && i->second.getSystemId() != self)
@@ -71,15 +98,84 @@ BrokerInfo::Set Membership::otherBackups
     return result;
 }
 
-bool Membership::get(const types::Uuid& id, BrokerInfo& result) {
-    BrokerInfo::Map::iterator i = brokers.find(id);
+bool Membership::get(const types::Uuid& id, BrokerInfo& result) const {
+    Mutex::ScopedLock l(lock);
+    BrokerInfo::Map::const_iterator i = brokers.find(id);
     if (i == brokers.end()) return false;
     result = i->second;
     return true;
 }
 
-std::ostream& operator<<(std::ostream& o, const Membership& members) {
-    return o << members.brokers;
+void Membership::update(Mutex::ScopedLock& l) {
+    QPID_LOG(info, "Membership: " <<  brokers);
+    Variant::List brokers = asList();
+    if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str());
+    if (mgmtObject) mgmtObject->set_members(brokers);
+    haBroker.getBroker().getManagementAgent()->raiseEvent(
+        _qmf::EventMembersUpdate(brokers));
+}
+
+void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
+    Mutex::ScopedLock l(lock);
+    mgmtObject = mo;
+    update(l);
+}
+
+
+namespace {
+bool checkTransition(BrokerStatus from, BrokerStatus to) {
+    // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
+    static const BrokerStatus TRANSITIONS[][2] = {
+        { STANDALONE, JOINING }, // Initialization of backup broker
+        { JOINING, CATCHUP },    // Connected to primary
+        { JOINING, RECOVERING }, // Chosen as initial primary.
+        { CATCHUP, READY },      // Caught up all queues, ready to take over.
+        { READY, RECOVERING },   // Chosen as new primary
+        { READY, CATCHUP },      // Timed out failing over, demoted to catch-up.
+        { RECOVERING, ACTIVE }   // All expected backups are ready
+    };
+    static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
+    for (size_t i = 0; i < N; ++i) {
+        if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
+            return true;
+    }
+    return false;
+}
+} // namespace
+
+void Membership::setStatus(BrokerStatus newStatus) {
+    BrokerStatus status = getStatus();
+    QPID_LOG(info, "Status change: "
+             << printable(status) << " -> " << printable(newStatus));
+    bool legal = checkTransition(status, newStatus);
+    if (!legal) {
+        haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status)
+                                 << " -> " << printable(newStatus)));
+    }
+
+    Mutex::ScopedLock l(lock);
+    brokers[self].setStatus(newStatus);
+    if (mgmtObject) mgmtObject->set_status(printable(newStatus).str());
+    update(l);
+}
+
+BrokerStatus Membership::getStatus() const  {
+    Mutex::ScopedLock l(lock);
+    return getStatus(l);
+}
+
+BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const  {
+    BrokerInfo::Map::const_iterator i = brokers.find(self);
+    assert(i != brokers.end());
+    return i->second.getStatus();
+}
+
+BrokerInfo Membership::getInfo() const  {
+    Mutex::ScopedLock l(lock);
+    BrokerInfo::Map::const_iterator i = brokers.find(self);
+    assert(i != brokers.end());
+    return i->second;
 }
 
+// FIXME aconway 2013-01-23: move to .h?
 }} // namespace qpid::ha

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.h Fri Jan 25 18:20:39 2013
@@ -24,45 +24,72 @@
 
 #include "BrokerInfo.h"
 #include "types.h"
-#include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
+#include "qpid/sys/Mutex.h"
 #include "qpid/types/Variant.h"
 #include <boost/function.hpp>
 #include <set>
 #include <vector>
 #include <iosfwd>
+
+namespace qmf { namespace org { namespace apache { namespace qpid { namespace ha {
+class HaBroker;
+}}}}}
+
 namespace qpid {
+
+namespace broker {
+class Broker;
+}
+
+namespace types {
+class Uuid;
+}
+
 namespace ha {
+class HaBroker;
 
 /**
  * Keep track of the brokers in the membership.
- * THREAD UNSAFE: caller must serialize
+ * Send management when events on membership changes.
+ * THREAD SAFE
  */
 class Membership
 {
   public:
-    Membership(const types::Uuid& self_) : self(self_) {}
+    Membership(const BrokerInfo& info, HaBroker&);
+
+    void setMgmtObject(boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker>);
 
-    void reset(const BrokerInfo& b); ///< Reset to contain just one member.
+    void clear();               ///< Clear all but self.
     void add(const BrokerInfo& b);
     void remove(const types::Uuid& id);
     bool contains(const types::Uuid& id);
+
     /** Return IDs of all READY backups other than self */
     BrokerInfo::Set otherBackups() const;
 
     void assign(const types::Variant::List&);
     types::Variant::List asList() const;
 
-    bool get(const types::Uuid& id, BrokerInfo& result);
+    bool get(const types::Uuid& id, BrokerInfo& result) const;
+
+    types::Uuid getSelf() const  { return self; }
+    BrokerInfo getInfo() const;
+    BrokerStatus getStatus() const;
+    void setStatus(BrokerStatus s);
 
   private:
-    types::Uuid self;
+    void update(sys::Mutex::ScopedLock&);
+    BrokerStatus getStatus(sys::Mutex::ScopedLock&) const;
+
+    mutable sys::Mutex lock;
+    HaBroker& haBroker;
+    boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker> mgmtObject;
+    const types::Uuid self;
     BrokerInfo::Map brokers;
-    friend std::ostream& operator<<(std::ostream&, const Membership&);
 };
 
-std::ostream& operator<<(std::ostream&, const Membership&);
-
 }} // namespace qpid::ha
 
 #endif  /*!QPID_HA_MEMBERSHIP_H*/

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.cpp Fri Jan 25 18:20:39 2013
@@ -82,8 +82,10 @@ class ExpectedBackupTimerTask : public s
 Primary* Primary::instance = 0;
 
 Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
-    haBroker(hb), logPrefix("Primary: "), active(false)
+    haBroker(hb), membership(hb.getMembership()),
+    logPrefix("Primary: "), active(false)
 {
+    hb.getMembership().setStatus(RECOVERING);
     assert(instance == 0);
     instance = this;            // Let queue replicators find us.
     if (expect.empty()) {
@@ -96,7 +98,7 @@ Primary::Primary(HaBroker& hb, const Bro
         QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect);
         for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
             boost::shared_ptr<RemoteBackup> backup(
-                new RemoteBackup(*i, haBroker.getReplicationTest(), false));
+                new RemoteBackup(*i, haBroker.getReplicationTest(), 0));
             backups[i->getSystemId()] = backup;
             if (!backup->isReady()) expectedBackups.insert(backup);
             backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards
@@ -108,11 +110,18 @@ Primary::Primary(HaBroker& hb, const Bro
         hb.getBroker().getTimer().add(timerTask);
     }
 
+
+    // Remove backup tag property from outgoing link properties.
+    framing::FieldTable linkProperties = hb.getBroker().getLinkClientProperties();
+    linkProperties.erase(ConnectionObserver::BACKUP_TAG);
+    hb.getBroker().setLinkClientProperties(linkProperties);
+
     configurationObserver.reset(new PrimaryConfigurationObserver(*this));
     haBroker.getBroker().getConfigurationObservers().add(configurationObserver);
 
     Mutex::ScopedLock l(lock);  // We are now active as a configurationObserver
     checkReady(l);
+
     // Allow client connections
     connectionObserver.reset(new PrimaryConnectionObserver(*this));
     haBroker.getObserver()->setObserver(connectionObserver, logPrefix);
@@ -128,7 +137,7 @@ void Primary::checkReady(Mutex::ScopedLo
         active = true;
         Mutex::ScopedUnlock u(lock); // Don't hold lock across callback
         QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active.");
-        haBroker.activate();
+        membership.setStatus(ACTIVE);
     }
 }
 
@@ -136,7 +145,7 @@ void Primary::checkReady(BackupMap::iter
     if (i != backups.end() && i->second->reportReady()) {
         BrokerInfo info = i->second->getBrokerInfo();
         info.setStatus(READY);
-        haBroker.addBroker(info);
+        membership.add(info);
         if (expectedBackups.erase(i->second)) {
             QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
             checkReady(l);
@@ -161,9 +170,10 @@ void Primary::timeoutExpectedBackups() {
                 expectedBackups.erase(i++);
                 backups.erase(info.getSystemId());
                 rb->cancel();
-                // Downgrade the broker to CATCHUP
+                // Downgrade the broker's status to CATCHUP
+                // The broker will get this status change when it eventually connects.
                 info.setStatus(CATCHUP);
-                haBroker.addBroker(info);
+                membership.add(info);
             }
             else ++i;
         }
@@ -228,7 +238,7 @@ void Primary::opened(broker::Connection&
         if (i == backups.end()) {
             QPID_LOG(info, logPrefix << "New backup connected: " << info);
             boost::shared_ptr<RemoteBackup> backup(
-                new RemoteBackup(info, haBroker.getReplicationTest(), true));
+                new RemoteBackup(info, haBroker.getReplicationTest(), &connection));
             {
                 // Avoid deadlock with queue registry lock.
                 Mutex::ScopedUnlock u(lock);
@@ -238,11 +248,11 @@ void Primary::opened(broker::Connection&
         }
         else {
             QPID_LOG(info, logPrefix << "Known backup connected: " << info);
-            i->second->setConnected(true);
+            i->second->setConnection(&connection);
             checkReady(i, l);
         }
         if (info.getStatus() == JOINING) info.setStatus(CATCHUP);
-        haBroker.addBroker(info);
+        membership.add(info);
     }
     else
         QPID_LOG(debug, logPrefix << "Accepted client connection "
@@ -259,7 +269,7 @@ void Primary::closed(broker::Connection&
         // Checking  isConnected() lets us ignore such spurious closes.
         if (i != backups.end() && i->second->isConnected()) {
             QPID_LOG(info, logPrefix << "Backup disconnected: " << info);
-            haBroker.removeBroker(info.getSystemId());
+            membership.remove(info.getSystemId());
             expectedBackups.erase(i->second);
             backups.erase(i);
             checkReady(l);
@@ -275,4 +285,9 @@ boost::shared_ptr<QueueGuard> Primary::g
     return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q);
 }
 
+Role* Primary::promote() {
+    QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo());
+    return 0;
+}
+
 }} // namespace qpid::ha

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.h Fri Jan 25 18:20:39 2013
@@ -24,6 +24,7 @@
 
 #include "types.h"
 #include "BrokerInfo.h"
+#include "Role.h"
 #include "qpid/sys/Mutex.h"
 #include <boost/shared_ptr.hpp>
 #include <boost/intrusive_ptr.hpp>
@@ -48,6 +49,7 @@ class HaBroker;
 class ReplicatingSubscription;
 class RemoteBackup;
 class QueueGuard;
+class Membership;
 
 /**
  * State associated with a primary broker:
@@ -56,7 +58,7 @@ class QueueGuard;
  *
  * THREAD SAFE: called concurrently in arbitrary connection threads.
  */
-class Primary
+class Primary : public Role
 {
   public:
     typedef boost::shared_ptr<broker::Queue> QueuePtr;
@@ -67,6 +69,11 @@ class Primary
     Primary(HaBroker& hb, const BrokerInfo::Set& expectedBackups);
     ~Primary();
 
+    // Role implementation
+    std::string getLogPrefix() const { return logPrefix; }
+    Role* promote();
+    void setBrokerUrl(const Url&) {}
+
     void readyReplica(const ReplicatingSubscription&);
     void removeReplica(const std::string& q);
 
@@ -94,12 +101,13 @@ class Primary
 
     sys::Mutex lock;
     HaBroker& haBroker;
+    Membership& membership;
     std::string logPrefix;
     bool active;
     /**
      * Set of expected backups that must be ready before we declare ourselves
-     * active. These are backups that were known before the primary crashed. As
-     * new primary we expect them to re-connect.
+     * active. These are backups that were known and ready before the primary
+     * crashed. As new primary we expect them to re-connect.
      */
     BackupSet expectedBackups;
     /**

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.cpp Fri Jan 25 18:20:39 2013
@@ -66,7 +66,7 @@ QueueGuard::~QueueGuard() { cancel(); }
 // NOTE: Called with message lock held.
 void QueueGuard::enqueued(const Message& m) {
     // Delay completion
-    QPID_LOG(trace, logPrefix << "Delayed completion of " << m);
+    QPID_LOG(trace, logPrefix << "Delayed completion of " << m.getSequence());
     m.getIngressCompletion()->startCompleter();
     {
         Mutex::ScopedLock l(lock);

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Jan 25 18:20:39 2013
@@ -260,9 +260,7 @@ void QueueReplicator::route(Deliverable&
         // Ignore unknown event keys, may be introduced in later versions.
     }
     catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "Replication failed: " << e.what());
-        haBroker.shutdown();
-        throw;
+        haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what()));
     }
 }
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Fri Jan 25 18:20:39 2013
@@ -21,6 +21,7 @@
 #include "RemoteBackup.h"
 #include "QueueGuard.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/Connection.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/log/Statement.h"
@@ -32,9 +33,10 @@ namespace ha {
 using sys::Mutex;
 using boost::bind;
 
-RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
-    logPrefix("Primary: Remote backup "+info.getLogId()+": "),
-    brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
+RemoteBackup::RemoteBackup(
+    const BrokerInfo& info, ReplicationTest rt, broker::Connection* c
+) : logPrefix("Primary: Remote backup "+info.getLogId()+": "),
+    brokerInfo(info), replicationTest(rt), connection(c), reportedReady(false)
 {}
 
 void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards)
@@ -46,13 +48,19 @@ void RemoteBackup::setCatchupQueues(brok
 RemoteBackup::~RemoteBackup() { cancel(); }
 
 void RemoteBackup::cancel() {
+    QPID_LOG(debug, logPrefix << "Cancelled " << (connection? "connected":"disconnected")
+             << " backup: " << brokerInfo);
     for (GuardMap::iterator i = guards.begin(); i != guards.end(); ++i)
         i->second->cancel();
     guards.clear();
+    if (connection) {
+        connection->abort();
+        connection = 0;
+    }
 }
 
 bool RemoteBackup::isReady() {
-    return connected && catchupQueues.empty();
+    return connection && catchupQueues.empty();
 }
 
 void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.h Fri Jan 25 18:20:39 2013
@@ -33,6 +33,7 @@ namespace qpid {
 namespace broker {
 class Queue;
 class QueueRegistry;
+class Connection;
 }
 
 namespace ha {
@@ -54,7 +55,7 @@ class RemoteBackup
     /** Note: isReady() can be true after construction
      *@param connected true if the backup is already connected.
      */
-    RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected);
+    RemoteBackup(const BrokerInfo&, ReplicationTest, broker::Connection*);
     ~RemoteBackup();
 
     /** Set all queues in the registry as catch-up queues.
@@ -66,8 +67,8 @@ class RemoteBackup
     GuardPtr guard(const QueuePtr&);
 
     /** Is the remote backup connected? */
-    void setConnected(bool b) { connected=b; }
-    bool isConnected() const { return connected; }
+    void setConnection(broker::Connection* c) { connection = c; }
+    bool isConnected() const { return connection; }
 
     /** ReplicatingSubscription associated with queue is ready.
      * Note: may set isReady()
@@ -101,7 +102,7 @@ class RemoteBackup
     ReplicationTest replicationTest;
     GuardMap guards;
     QueueSet catchupQueues;
-    bool connected;
+    broker::Connection* connection;
     bool reportedReady;
 };
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Jan 25 18:20:39 2013
@@ -65,6 +65,8 @@ class QueueGuard;
 class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
 {
   public:
+    typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
+
     struct Factory : public broker::ConsumerFactory {
         boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
             broker::SemanticState* parent,

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/StatusCheck.h Fri Jan 25 18:20:39 2013
@@ -32,6 +32,11 @@
 namespace qpid {
 namespace ha {
 
+// FIXME aconway 2012-12-21: This solution is incomplete. It will only protect
+// against bad promotion if there are READY brokers when this broker starts.
+// It will not help the situation where brokers became READY after this one starts.
+//
+
 /**
  * Check whether a JOINING broker can be promoted .
  *
@@ -49,8 +54,10 @@ class StatusCheck
     ~StatusCheck();
     void setUrl(const Url&);
     bool canPromote();
-    void setPromote(bool p);
+
   private:
+    void setPromote(bool p);
+
     std::string logPrefix;
     sys::Mutex lock;
     std::vector<sys::Thread> threads;

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Jan 25 18:20:39 2013
@@ -28,11 +28,13 @@
 #include "qpid/management/ManagementObject.h"
 #include "qpid/broker/DeliverableMessage.h"
 #include "qpid/log/Statement.h"
-#include <qpid/broker/Message.h>
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Broker.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/PollableQueue.h"
 #include "qpid/broker/ConnectionState.h"
@@ -47,6 +49,9 @@
 #include <sstream>
 #include <typeinfo>
 
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+
 namespace qpid {
 namespace management {
 
@@ -92,6 +97,32 @@ struct ScopedManagementContext
         setManagementExecutionContext(0);
     }
 };
+
+typedef boost::function0<void> FireFunction;
+struct Periodic : public qpid::sys::TimerTask
+{
+    FireFunction fireFunction;
+    qpid::sys::Timer* timer;
+
+    Periodic (FireFunction f, qpid::sys::Timer* t, uint32_t seconds);
+    virtual ~Periodic ();
+    void fire ();
+};
+
+Periodic::Periodic (FireFunction f, qpid::sys::Timer* t, uint32_t _seconds)
+    : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC),
+                "ManagementAgent::periodicProcessing"),
+      fireFunction(f), timer(t) {}
+
+Periodic::~Periodic() {}
+
+void Periodic::fire()
+{
+    setupNextFire();
+    timer->add(this);
+    fireFunction();
+}
+
 }
 
 
@@ -170,6 +201,8 @@ void ManagementAgent::configure(const st
     sendQueue.reset(
         new EventQueue(boost::bind(&ManagementAgent::sendEvents, this, _1), broker->getPoller()));
     sendQueue->start();
+    timer          = &broker->getTimer();
+    timer->add(new Periodic(boost::bind(&ManagementAgent::periodicProcessing, this), timer, interval));
 
     // Get from file or generate and save to file.
     if (dataDir.empty())
@@ -212,13 +245,6 @@ void ManagementAgent::configure(const st
     }
 }
 
-void ManagementAgent::pluginsInitialized() {
-    // Do this here so cluster plugin has the chance to set up the timer.
-    timer          = &broker->getClusterTimer();
-    timer->add(new Periodic(*this, interval));
-}
-
-
 void ManagementAgent::setName(const string& vendor, const string& product, const string& instance)
 {
     if (vendor.find(':') != vendor.npos) {
@@ -420,20 +446,6 @@ void ManagementAgent::raiseEvent(const M
     }
 }
 
-ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
-    : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC),
-                "ManagementAgent::periodicProcessing"),
-      agent(_agent) {}
-
-ManagementAgent::Periodic::~Periodic() {}
-
-void ManagementAgent::Periodic::fire()
-{
-    setupNextFire();
-    agent.timer->add(this);
-    agent.periodicProcessing();
-}
-
 void ManagementAgent::clientAdded (const string& routingKey)
 {
     sys::Mutex::ScopedLock lock(userLock);
@@ -477,17 +489,6 @@ void ManagementAgent::clientAdded (const
     }
 }
 
-void ManagementAgent::clusterUpdate() {
-    // Called on all cluster memebers when a new member joins a cluster.
-    // Set clientWasAdded so that on the next periodicProcessing we will do 
-    // a full update on all cluster members.
-    sys::Mutex::ScopedLock l(userLock);
-    moveNewObjects();         // keep lists consistent with updater/updatee.
-    moveDeletedObjects();
-    clientWasAdded = true;
-    debugSnapshot("Cluster member joined");
-}
-
 void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
 {
     buf.putOctet ('A');
@@ -2550,70 +2551,6 @@ void ManagementAgent::SchemaClass::mapDe
     }
 }
 
-void ManagementAgent::exportSchemas(string& out) {
-    Variant::List list_;
-    Variant::Map map_, kmap, cmap;
-
-    for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) {
-        string name = i->first;
-        const ClassMap& classes = i ->second;
-        for (ClassMap::const_iterator j = classes.begin(); j != classes.end(); ++j) {
-            const SchemaClassKey& key = j->first;
-            const SchemaClass& klass = j->second;
-            if (klass.writeSchemaCall == 0) { // Ignore built-in schemas.
-                // Encode name, schema-key, schema-class
-
-                map_.clear();
-                kmap.clear();
-                cmap.clear();
-
-                key.mapEncode(kmap);
-                klass.mapEncode(cmap);
-
-                map_["_pname"] = name;
-                map_["_key"] = kmap;
-                map_["_class"] = cmap;
-                list_.push_back(map_);
-            }
-        }
-    }
-
-    ListCodec::encode(list_, out);
-}
-
-void ManagementAgent::importSchemas(qpid::framing::Buffer& inBuf) {
-
-    string buf(inBuf.getPointer(), inBuf.available());
-    Variant::List content;
-    ListCodec::decode(buf, content);
-    Variant::List::const_iterator l;
-
-
-    for (l = content.begin(); l != content.end(); l++) {
-        string package;
-        SchemaClassKey key;
-        SchemaClass klass;
-        Variant::Map map_, kmap, cmap;
-        Variant::Map::const_iterator i;
-        
-        map_ = l->asMap();
-
-        if ((i = map_.find("_pname")) != map_.end()) {
-            package = i->second.asString();
-
-            if ((i = map_.find("_key")) != map_.end()) {
-                key.mapDecode(i->second.asMap());
-
-                if ((i = map_.find("_class")) != map_.end()) {
-                    klass.mapDecode(i->second.asMap());
-
-                    packages[package][key] = klass;
-                }
-            }
-        }
-    }
-}
-
 void ManagementAgent::RemoteAgent::mapEncode(Variant::Map& map_) const {
     Variant::Map _objId, _values;
 
@@ -2657,52 +2594,6 @@ void ManagementAgent::RemoteAgent::mapDe
     mgmtObject->set_connectionRef(connectionRef);
 }
 
-void ManagementAgent::exportAgents(string& out) {
-    Variant::List list_;
-    Variant::Map map_, omap, amap;
-
-    for (RemoteAgentMap::const_iterator i = remoteAgents.begin();
-         i != remoteAgents.end();
-         ++i)
-    {
-        // TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode
-        boost::shared_ptr<RemoteAgent> agent(i->second);
-
-        map_.clear();
-        amap.clear();
-
-        agent->mapEncode(amap);
-        map_["_remote_agent"] = amap;
-        list_.push_back(map_);
-    }
-
-    ListCodec::encode(list_, out);
-}
-
-void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {
-    string buf(inBuf.getPointer(), inBuf.available());
-    Variant::List content;
-    ListCodec::decode(buf, content);
-    Variant::List::const_iterator l;
-    sys::Mutex::ScopedLock lock(userLock);
-
-    for (l = content.begin(); l != content.end(); l++) {
-        boost::shared_ptr<RemoteAgent> agent(new RemoteAgent(*this));
-        Variant::Map map_;
-        Variant::Map::const_iterator i;
-
-        map_ = l->asMap();
-
-        if ((i = map_.find("_remote_agent")) != map_.end()) {
-
-            agent->mapDecode(i->second.asMap());
-
-            addObject (agent->mgmtObject, 0, false);
-            remoteAgents[agent->connectionRef] = agent;
-        }
-    }
-}
-
 namespace {
 bool isDeletedMap(const ManagementObjectMap::value_type& value) {
     return value.second->isDeleted();
@@ -2781,54 +2672,6 @@ Variant::Map ManagementAgent::toMap(cons
     return map;
 }
 
-
-// Build up a list of the current set of deleted objects that are pending their
-// next (last) publish-ment.
-void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList)
-{
-    outList.clear();
-
-    sys::Mutex::ScopedLock lock (userLock);
-
-    moveNewObjects();
-    moveDeletedObjects();
-
-    // now copy the pending deletes into the outList
-    for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin();
-         mIter != pendingDeletedObjs.end(); mIter++) {
-        for (DeletedObjectList::iterator lIter = mIter->second.begin();
-             lIter != mIter->second.end(); lIter++) {
-            outList.push_back(*lIter);
-        }
-    }
-}
-
-// Called by cluster to reset the management agent's list of deleted
-// objects to match the rest of the cluster.
-void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList)
-{
-    sys::Mutex::ScopedLock lock (userLock);
-    sys::Mutex::ScopedLock objLock(objectLock);
-    // Clear out any existing deleted objects
-    moveNewObjects();
-    pendingDeletedObjs.clear();
-    ManagementObjectMap::iterator i = managementObjects.begin();
-    // Silently drop any deleted objects left over from receiving the update.
-    while (i != managementObjects.end()) {
-        ManagementObject::shared_ptr object = i->second;
-        if (object->isDeleted()) {
-            managementObjects.erase(i++);
-        }
-        else ++i;
-    }
-    for (DeletedObjectList::const_iterator lIter = inList.begin(); lIter != inList.end(); lIter++) {
-
-        std::string classkey((*lIter)->packageName + std::string(":") + (*lIter)->className);
-        pendingDeletedObjs[classkey].push_back(*lIter);
-    }
-}
-
-
 // construct a DeletedObject from a management object.
 ManagementAgent::DeletedObject::DeletedObject(ManagementObject::shared_ptr src, bool v1, bool v2)
     : packageName(src->getPackageName()),
@@ -2866,45 +2709,6 @@ ManagementAgent::DeletedObject::DeletedO
     }
 }
 
-
-
-// construct a DeletedObject from an encoded representation. Used by
-// clustering to move deleted objects between clustered brokers.  See
-// DeletedObject::encode() for the reverse.
-ManagementAgent::DeletedObject::DeletedObject(const std::string& encoded)
-{
-    qpid::types::Variant::Map map_;
-    MapCodec::decode(encoded, map_);
-
-    packageName = map_["_package_name"].getString();
-    className = map_["_class_name"].getString();
-    objectId = map_["_object_id"].getString();
-
-    encodedV1Config = map_["_v1_config"].getString();
-    encodedV1Inst = map_["_v1_inst"].getString();
-    encodedV2 = map_["_v2_data"].asMap();
-}
-
-
-// encode a DeletedObject to a string buffer. Used by
-// clustering to move deleted objects between clustered brokers.  See
-// DeletedObject(const std::string&) for the reverse.
-void ManagementAgent::DeletedObject::encode(std::string& toBuffer)
-{
-    qpid::types::Variant::Map map_;
-
-
-    map_["_package_name"] = packageName;
-    map_["_class_name"] = className;
-    map_["_object_id"] = objectId;
-
-    map_["_v1_config"] = encodedV1Config;
-    map_["_v1_inst"] = encodedV1Inst;
-    map_["_v2_data"] = encodedV2;
-
-    MapCodec::encode(map_, toBuffer);
-}
-
 // Remove Deleted objects, and save for later publishing...
 bool ManagementAgent::moveDeletedObjects() {
     typedef vector<pair<ObjectId, ManagementObject::shared_ptr> > DeleteList;

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:r1422061-1438053

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h Fri Jan 25 18:20:39 2013
@@ -26,7 +26,6 @@
 #include "qpid/broker/Exchange.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/sys/Mutex.h"
-#include "qpid/sys/Timer.h"
 #include "qpid/broker/ConnectionToken.h"
 #include "qpid/management/ManagementObject.h"
 #include "qpid/management/ManagementEvent.h"
@@ -47,6 +46,9 @@ namespace qpid {
 namespace broker {
 class ConnectionState;
 }
+namespace sys {
+class Timer;
+}
 namespace management {
 
 class ManagementAgent
@@ -75,11 +77,6 @@ public:
     /** Called before plugins are initialized */
     void configure       (const std::string& dataDir, bool publish, uint16_t interval,
                           qpid::broker::Broker* broker, int threadPoolSize);
-    /** Called after plugins are initialized. */
-    void pluginsInitialized();
-
-    /** Called by cluster to suppress management output during update. */
-    void suppress(bool s) { suppressed = s; }
 
     void setName(const std::string& vendor,
                  const std::string& product,
@@ -112,8 +109,6 @@ public:
                                        severity_t severity = SEV_DEFAULT);
     QPID_BROKER_EXTERN void clientAdded     (const std::string& routingKey);
 
-    QPID_BROKER_EXTERN void clusterUpdate();
-
     bool dispatchCommand (qpid::broker::Deliverable&       msg,
                           const std::string&         routingKey,
                           const framing::FieldTable* args,
@@ -123,25 +118,6 @@ public:
     /** Disallow a method. Attempts to call it will receive an exception with message. */
     void disallow(const std::string& className, const std::string& methodName, const std::string& message);
 
-    /** Disallow all QMFv1 methods (used in clustered brokers). */
-    void disallowV1Methods() { disallowAllV1Methods = true; }
-
-    /** Serialize my schemas as a binary blob into schemaOut */
-    void exportSchemas(std::string& schemaOut);
-
-    /** Serialize my remote-agent map as a binary blob into agentsOut */
-    void exportAgents(std::string& agentsOut);
-
-    /** Decode a serialized schemas and add to my schema cache */
-    void importSchemas(framing::Buffer& inBuf);
-
-    /** Decode a serialized agent map */
-    void importAgents(framing::Buffer& inBuf);
-
-    // these are in support of the managementSetup-state stuff, for synch'ing clustered brokers
-    uint64_t getNextObjectId(void) { return nextObjectId; }
-    void setNextObjectId(uint64_t o) { nextObjectId = o; }
-
     uint16_t getBootSequence(void) { return bootSequence; }
     void setBootSequence(uint16_t b) { bootSequence = b; writeData(); }
 
@@ -150,20 +126,11 @@ public:
 
     static types::Variant::Map toMap(const framing::FieldTable& from);
 
-    // For Clustering: management objects that have been marked as
-    // "deleted", but are waiting for their last published object
-    // update are not visible to the cluster replication code.  These
-    // interfaces allow clustering to gather up all the management
-    // objects that are deleted in order to allow all clustered
-    // brokers to publish the same set of deleted objects.
-
     class DeletedObject {
       public:
         typedef boost::shared_ptr<DeletedObject> shared_ptr;
         DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2);
-        DeletedObject( const std::string &encoded );
         ~DeletedObject() {};
-        void encode( std::string& toBuffer );
         const std::string getKey() const {
             // used to batch up objects of the same class type
             return std::string(packageName + std::string(":") + className);
@@ -183,22 +150,7 @@ public:
 
     typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList;
 
-    /** returns a snapshot of all currently deleted management objects. */
-    void exportDeletedObjects( DeletedObjectList& outList );
-
-    /** Import a list of deleted objects to send on next publish interval. */
-    void importDeletedObjects( const DeletedObjectList& inList );
-
 private:
-    struct Periodic : public qpid::sys::TimerTask
-    {
-        ManagementAgent& agent;
-
-        Periodic (ManagementAgent& agent, uint32_t seconds);
-        virtual ~Periodic ();
-        void fire ();
-    };
-
     //  Storage for tracking remote management agents, attached via the client
     //  management agent API.
     //

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:r1422061-1438053

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp Fri Jan 25 18:20:39 2013
@@ -136,12 +136,6 @@ MessageStorePlugin::providerAvailable(co
         QPID_LOG(warning, "Storage provider " << name << " duplicate; ignored.");
 }
 
-void
-MessageStorePlugin::truncateInit(const bool /*saveStoreContent*/)
-{
-    QPID_LOG(info, "Store: truncateInit");
-}
-
 
 /**
  * Record the existence of a durable queue

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.h Fri Jan 25 18:20:39 2013
@@ -24,18 +24,22 @@
 
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
-#include "qpid/broker/Broker.h"
 #include "qpid/broker/MessageStore.h"
-#include "qpid/broker/PersistableExchange.h"
-#include "qpid/broker/PersistableMessage.h"
-#include "qpid/broker/PersistableQueue.h"
-#include "qpid/management/Manageable.h"
+//#include "qpid/management/Manageable.h"
 
 #include <string>
 
 using namespace qpid;
 
 namespace qpid {
+
+namespace broker {
+class Broker;
+class PersistableExchange;
+class PersistableMessage;
+class PersistableQueue;
+}
+
 namespace store {
 
 class StorageProvider;
@@ -82,18 +86,6 @@ class MessageStorePlugin :
     /**
      * @name Methods inherited from qpid::broker::MessageStore
      */
-    //@{
-    /**
-     * If called before recovery, will discard the database and reinitialize
-     * using an empty store. This is used when cluster nodes recover and
-     * must get their content from a cluster sync rather than directly from
-     * the store.
-     *
-     * @param saveStoreContent    If true, the store's contents should be
-     *                            saved to a backup location before
-     *                            reinitializing the store content.
-     */
-    virtual void truncateInit(const bool saveStoreContent = false);
 
     /**
      * Record the existence of a durable queue

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/StorageProvider.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/StorageProvider.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/StorageProvider.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/StorageProvider.h Fri Jan 25 18:20:39 2013
@@ -143,20 +143,6 @@ public:
     /**
      * @name Methods inherited from qpid::broker::MessageStore
      */
-    //@{
-    /**
-     * If called after init() but before recovery, will discard the database
-     * and reinitialize using an empty store dir. If @a pushDownStoreFiles
-     * is true, the content of the store dir will be moved to a backup dir
-     * inside the store dir. This is used when cluster nodes recover and must
-     * get thier content from a cluster sync rather than directly fromt the
-     * store.
-     *
-     * @param pushDownStoreFiles If true, will move content of the store dir
-     *                           into a subdir, leaving the store dir
-     *                           otherwise empty.
-     */
-    virtual void truncateInit(const bool pushDownStoreFiles = false) = 0;
 
     /**
      * Record the existence of a durable queue

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp Fri Jan 25 18:20:39 2013
@@ -26,6 +26,7 @@
 #include <string>
 #include <windows.h>
 #include <clfsw32.h>
+#include <qpid/broker/Broker.h>
 #include <qpid/broker/RecoverableQueue.h>
 #include <qpid/log/Statement.h>
 #include <qpid/store/MessageStorePlugin.h>
@@ -108,20 +109,6 @@ public:
     /**
      * @name Methods inherited from qpid::broker::MessageStore
      */
-    //@{
-    /**
-     * If called after init() but before recovery, will discard the database
-     * and reinitialize using an empty store dir. If @a pushDownStoreFiles
-     * is true, the content of the store dir will be moved to a backup dir
-     * inside the store dir. This is used when cluster nodes recover and must
-     * get their content from a cluster sync rather than directly from the
-     * store.
-     *
-     * @param pushDownStoreFiles If true, will move content of the store dir
-     *                           into a subdir, leaving the store dir
-     *                           otherwise empty.
-     */
-    virtual void truncateInit(const bool pushDownStoreFiles = false);
 
     /**
      * Record the existence of a durable queue
@@ -467,11 +454,6 @@ MSSqlClfsProvider::activate(MessageStore
 }
 
 void
-MSSqlClfsProvider::truncateInit(const bool pushDownStoreFiles)
-{
-}
-
-void
 MSSqlClfsProvider::create(PersistableQueue& queue,
                           const qpid::framing::FieldTable& /*args needed for jrnl*/)
 {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp Fri Jan 25 18:20:39 2013
@@ -92,20 +92,6 @@ public:
     /**
      * @name Methods inherited from qpid::broker::MessageStore
      */
-    //@{
-    /**
-     * If called after init() but before recovery, will discard the database
-     * and reinitialize using an empty store dir. If @a pushDownStoreFiles
-     * is true, the content of the store dir will be moved to a backup dir
-     * inside the store dir. This is used when cluster nodes recover and must
-     * get thier content from a cluster sync rather than directly fromt the
-     * store.
-     *
-     * @param pushDownStoreFiles If true, will move content of the store dir
-     *                           into a subdir, leaving the store dir
-     *                           otherwise empty.
-     */
-    virtual void truncateInit(const bool pushDownStoreFiles = false);
 
     /**
      * Record the existence of a durable queue
@@ -392,11 +378,6 @@ MSSqlProvider::activate(MessageStorePlug
 }
 
 void
-MSSqlProvider::truncateInit(const bool pushDownStoreFiles)
-{
-}
-
-void
 MSSqlProvider::create(PersistableQueue& queue,
                       const qpid::framing::FieldTable& /*args needed for jrnl*/)
 {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Fri Jan 25 18:20:39 2013
@@ -51,14 +51,14 @@ struct ProtocolTimeoutTask : public sys:
     }
 };
 
-AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool nodict0) :
+AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool isClient0, bool nodict0) :
     identifier(id),
     aio(0),
     factory(f),
     codec(0),
     reads(0),
     readError(false),
-    isClient(false),
+    isClient(isClient0),
     nodict(nodict0),
     readCredit(InfiniteCredit)
 {}

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.h Fri Jan 25 18:20:39 2013
@@ -60,12 +60,10 @@ class AsynchIOHandler : public OutputCon
     void write(const framing::ProtocolInitiation&);
 
   public:
-    QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f, bool nodict);
+    QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f, bool isClient, bool nodict);
     QPID_COMMON_EXTERN ~AsynchIOHandler();
     QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime);
 
-    QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
-
     // Output side
     QPID_COMMON_EXTERN void abort();
     QPID_COMMON_EXTERN void activateOutput();

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/FileSysDir.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/FileSysDir.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/FileSysDir.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/FileSysDir.h Fri Jan 25 18:20:39 2013
@@ -54,6 +54,15 @@ class FileSysDir
 
     void mkdir(void);
 
+    typedef void Callback(const std::string&);
+
+    /**
+     * Call the Callback function for every regular file in the directory
+     *
+     * @param cb Callback function that receives the full path to the file
+     */
+    void forEachFile(Callback cb) const;
+
     std::string getPath   () { return dirPath; }
 };
  

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/OutputControl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/OutputControl.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/OutputControl.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/OutputControl.h Fri Jan 25 18:20:39 2013
@@ -1,3 +1,6 @@
+#ifndef QPID_SYS_OUTPUT_CONTROL_H
+#define QPID_SYS_OUTPUT_CONTROL_H
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -21,9 +24,6 @@
 
 #include "qpid/sys/IntegerTypes.h"
 
-#ifndef _OutputControl_
-#define _OutputControl_
-
 namespace qpid {
 namespace sys {
 
@@ -40,4 +40,4 @@ namespace sys {
 }
 
 
-#endif
+#endif /*!QPID_SYS_OUTPUT_CONTROL_H*/

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ProtocolFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ProtocolFactory.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ProtocolFactory.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ProtocolFactory.h Fri Jan 25 18:20:39 2013
@@ -42,6 +42,7 @@ class ProtocolFactory : public qpid::Sha
     virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
     virtual void connect(
         boost::shared_ptr<Poller>,
+        const std::string& name,
         const std::string& host, const std::string& port,
         ConnectionCodec::Factory* codec,
         ConnectFailedCallback failed) = 0;

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Fri Jan 25 18:20:39 2013
@@ -23,6 +23,7 @@
 
 #include "qpid/Plugin.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
 #include "qpid/framing/AMQP_HighestVersion.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/rdma/RdmaIO.h"
@@ -83,7 +84,7 @@ class RdmaIOHandler : public OutputContr
 };
 
 RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) :
-    identifier(c->getFullName()),
+    identifier(broker::QPID_NAME_PREFIX+c->getFullName()),
     factory(f),
     codec(0),
     readError(false),
@@ -250,7 +251,7 @@ class RdmaIOProtocolFactory : public Pro
   public:
     RdmaIOProtocolFactory(int16_t port, int backlog);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
-    void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
+    void connect(Poller::shared_ptr, const std::string& name, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
 
     uint16_t getPort() const;
 
@@ -371,6 +372,7 @@ void RdmaIOProtocolFactory::connected(Po
 
 void RdmaIOProtocolFactory::connect(
     Poller::shared_ptr poller,
+    const std::string& /*name*/,
     const std::string& host, const std::string& port,
     ConnectionCodec::Factory* f,
     ConnectFailedCallback failed)

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/SslPlugin.cpp Fri Jan 25 18:20:39 2013
@@ -23,6 +23,7 @@
 
 #include "qpid/Plugin.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/AsynchIOHandler.h"
 #include "qpid/sys/AsynchIO.h"
@@ -76,15 +77,16 @@ class SslProtocolFactory : public Protoc
     SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
                        Timer& timer);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
-    void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
+    void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
                  ConnectionCodec::Factory*,
                  ConnectFailedCallback);
 
     uint16_t getPort() const;
 
   private:
-    void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
-                     bool isClient);
+    void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
+    void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
+    void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
     void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
 };
 
@@ -220,21 +222,24 @@ SslProtocolFactory::SslProtocolFactory(c
     }
 }
 
+void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
+                                          ConnectionCodec::Factory* f) {
+    AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
+    establishedCommon(async, poller, s);
+}
 
-void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
-                                     ConnectionCodec::Factory* f, bool isClient) {
-
-    AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, nodict);
+void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
+                                             ConnectionCodec::Factory* f, const std::string& name) {
+    AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
+    establishedCommon(async, poller, s);
+}
 
+void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
     if (tcpNoDelay) {
         s.setTcpNoDelay();
         QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
     }
 
-    if (isClient) {
-        async->setClient();
-    }
-
     AsynchIO* aio = AsynchIO::create(
         s,
         boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
@@ -257,7 +262,7 @@ void SslProtocolFactory::accept(Poller::
     for (unsigned i = 0; i<listeners.size(); ++i) {
         acceptors.push_back(
             AsynchAcceptor::create(listeners[i],
-                            boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
+                            boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
         acceptors[i].start(poller);
     }
 }
@@ -273,6 +278,7 @@ void SslProtocolFactory::connectFailed(
 
 void SslProtocolFactory::connect(
     Poller::shared_ptr poller,
+    const std::string& name,
     const std::string& host, const std::string& port,
     ConnectionCodec::Factory* fact,
     ConnectFailedCallback failed)
@@ -289,8 +295,8 @@ void SslProtocolFactory::connect(
         *socket,
         host,
         port,
-        boost::bind(&SslProtocolFactory::established,
-                    this, poller, _1, fact, true),
+        boost::bind(&SslProtocolFactory::establishedOutgoing,
+                    this, poller, _1, fact, name),
         boost::bind(&SslProtocolFactory::connectFailed,
                     this, _1, _2, _3, failed));
     c->start(poller);

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Fri Jan 25 18:20:39 2013
@@ -23,6 +23,7 @@
 
 #include "qpid/Plugin.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/AsynchIOHandler.h"
 #include "qpid/sys/AsynchIO.h"
@@ -50,15 +51,17 @@ class AsynchIOProtocolFactory : public P
   public:
     AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
-    void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
+    void connect(Poller::shared_ptr, const std::string& name,
+                 const std::string& host, const std::string& port,
                  ConnectionCodec::Factory*,
                  ConnectFailedCallback);
 
     uint16_t getPort() const;
 
   private:
-    void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
-                     bool isClient);
+    void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
+    void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
+    void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
     void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
 };
 
@@ -171,17 +174,24 @@ AsynchIOProtocolFactory::AsynchIOProtoco
     }
 }
 
-void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
-                                          ConnectionCodec::Factory* f, bool isClient) {
-    AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, false);
+void AsynchIOProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
+                                          ConnectionCodec::Factory* f) {
+    AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
+    establishedCommon(async, poller, s);
+}
+
+void AsynchIOProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
+                                              ConnectionCodec::Factory* f, const std::string& name) {
+    AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
+    establishedCommon(async, poller, s);
+}
 
+void AsynchIOProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
     if (tcpNoDelay) {
         s.setTcpNoDelay();
         QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
     }
 
-    if (isClient)
-        async->setClient();
     AsynchIO* aio = AsynchIO::create
       (s,
        boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
@@ -204,7 +214,7 @@ void AsynchIOProtocolFactory::accept(Pol
     for (unsigned i = 0; i<listeners.size(); ++i) {
         acceptors.push_back(
             AsynchAcceptor::create(listeners[i],
-                            boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+                            boost::bind(&AsynchIOProtocolFactory::establishedIncoming, this, poller, _1, fact)));
         acceptors[i].start(poller);
     }
 }
@@ -220,6 +230,7 @@ void AsynchIOProtocolFactory::connectFai
 
 void AsynchIOProtocolFactory::connect(
     Poller::shared_ptr poller,
+    const std::string& name,
     const std::string& host, const std::string& port,
     ConnectionCodec::Factory* fact,
     ConnectFailedCallback failed)
@@ -235,8 +246,8 @@ void AsynchIOProtocolFactory::connect(
         *socket,
         host,
         port,
-        boost::bind(&AsynchIOProtocolFactory::established,
-                    this, poller, _1, fact, true),
+        boost::bind(&AsynchIOProtocolFactory::establishedOutgoing,
+                    this, poller, _1, fact, name),
         boost::bind(&AsynchIOProtocolFactory::connectFailed,
                     this, _1, _2, _3, failed));
     c->start(poller);

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.cpp Fri Jan 25 18:20:39 2013
@@ -96,12 +96,7 @@ void TimerTask::cancel() {
     state = CANCELLED;
 }
 
-void TimerTask::setFired() {
-    // Set nextFireTime to just before now, making readyToFire() true.
-    nextFireTime = AbsTime(sys::now(), Duration(-1));
-}
-
-
+// TODO AStitcher 21/08/09 The threshholds for emitting warnings are a little arbitrary
 Timer::Timer() :
     active(false),
     late(50 * TIME_MSEC),
@@ -133,7 +128,6 @@ public:
     }
 };
 
-// TODO AStitcher 21/08/09 The threshholds for emitting warnings are a little arbitrary
 void Timer::run()
 {
     Monitor::ScopedLock l(monitor);
@@ -151,10 +145,6 @@ void Timer::run()
             {
             TimerTaskCallbackScope s(*t);
             if (s) {
-                {
-                    Monitor::ScopedUnlock u(monitor);
-                    drop(t);
-                }
                 if (delay > lateCancel) {
                     QPID_LOG(debug, t->name << " cancelled timer woken up " <<
                              delay / TIME_MSEC << "ms late");
@@ -235,9 +225,6 @@ void Timer::fire(boost::intrusive_ptr<Ti
     }
 }
 
-// Provided for subclasses: called when a task is droped.
-void Timer::drop(boost::intrusive_ptr<TimerTask>) {}
-
 bool operator<(const intrusive_ptr<TimerTask>& a,
                        const intrusive_ptr<TimerTask>& b)
 {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.h Fri Jan 25 18:20:39 2013
@@ -67,10 +67,6 @@ class TimerTask : public RefCounted {
 
     std::string getName() const { return name; }
 
-    // Move the nextFireTime so readyToFire is true.
-    // Used by the cluster, where tasks are fired on cluster events, not on local time.
-    QPID_COMMON_EXTERN void setFired();
-
   protected:
     // Must be overridden with callback
     virtual void fire() = 0;
@@ -99,7 +95,7 @@ class Timer : private Runnable {
 
   protected:
     QPID_COMMON_EXTERN virtual void fire(boost::intrusive_ptr<TimerTask> task);
-    QPID_COMMON_EXTERN virtual void drop(boost::intrusive_ptr<TimerTask> task);
+
     // Allow derived classes to change the late/overran thresholds.
     Duration late;
     Duration overran;

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp Fri Jan 25 18:20:39 2013
@@ -162,11 +162,6 @@ void BSDSocket::connect(const SocketAddr
     // remote port (which is unoccupied) as the port to bind the local
     // end of the socket, resulting in a "circular" connection.
     //
-    // This seems like something the OS should prevent but I have
-    // confirmed that sporadic hangs in
-    // cluster_tests.LongTests.test_failover on RHEL5 are caused by
-    // such a circular connection.
-    //
     // Raise an error if we see such a connection, since we know there is
     // no listener on the peer address.
     //

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/FileSysDir.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/FileSysDir.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/FileSysDir.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/FileSysDir.cpp Fri Jan 25 18:20:39 2013
@@ -18,6 +18,7 @@
 
 #include "qpid/sys/FileSysDir.h"
 #include "qpid/sys/StrError.h"
+#include "qpid/log/Statement.h"
 #include "qpid/Exception.h"
 
 #include <sys/types.h>
@@ -25,6 +26,8 @@
 #include <fcntl.h>
 #include <cerrno>
 #include <unistd.h>
+#include <dirent.h>
+#include <stdlib.h>
 
 namespace qpid {
 namespace sys {
@@ -51,4 +54,27 @@ void FileSysDir::mkdir(void)
         throw Exception ("Can't create directory: " + dirPath);
 }
 
+void FileSysDir::forEachFile(Callback cb) const {
+
+    ::dirent** namelist;
+
+    int n = scandir(dirPath.c_str(), &namelist, 0, alphasort);
+    if (n == -1) throw Exception (strError(errno) + ": Can't scan directory: " + dirPath);
+
+    for (int i = 0; i<n; ++i) {
+        std::string fullpath = dirPath + "/" + namelist[i]->d_name;
+        // Filter out non files/stat problems etc.
+        struct ::stat s;
+        // Can't throw here without leaking memory, so just do nothing with
+        // entries for which stat() fails.
+        if (!::stat(fullpath.c_str(), &s)) {
+            if (S_ISREG(s.st_mode)) {
+                cb(fullpath);
+            }
+        }
+        ::free(namelist[i]);
+    }
+    ::free(namelist);
+}
+
 }} // namespace qpid::sys

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/SystemInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/SystemInfo.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/SystemInfo.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/SystemInfo.cpp Fri Jan 25 18:20:39 2013
@@ -77,51 +77,6 @@ inline bool isLoopback(const ::sockaddr*
     }
 }
 
-void SystemInfo::getLocalIpAddresses (uint16_t port,
-                                      std::vector<Address> &addrList) {
-    ::ifaddrs* ifaddr = 0;
-    QPID_POSIX_CHECK(::getifaddrs(&ifaddr));
-    for (::ifaddrs* ifap = ifaddr; ifap != 0; ifap = ifap->ifa_next) {
-        if (ifap->ifa_addr == 0) continue;
-        if (isLoopback(ifap->ifa_addr)) continue;
-        int family = ifap->ifa_addr->sa_family;
-        switch (family) {
-            case AF_INET6: {
-                // Ignore link local addresses as:
-                // * The scope id is illegal in URL syntax
-                // * Clients won't be able to use a link local address
-                //   without adding their own (potentially different) scope id
-                sockaddr_in6* sa6 = (sockaddr_in6*)((void*)ifap->ifa_addr);
-                if (IN6_IS_ADDR_LINKLOCAL(&sa6->sin6_addr)) break;
-                // Fallthrough
-            }
-            case AF_INET: {
-              char dispName[NI_MAXHOST];
-              int rc = ::getnameinfo(
-                  ifap->ifa_addr,
-                  (family == AF_INET)
-                  ? sizeof(struct sockaddr_in)
-                  : sizeof(struct sockaddr_in6),
-                  dispName, sizeof(dispName),
-                  0, 0, NI_NUMERICHOST);
-              if (rc != 0) {
-                  throw QPID_POSIX_ERROR(rc);
-              }
-              string addr(dispName);
-              addrList.push_back(Address(TCP, addr, port));
-              break;
-          }
-          default:
-            continue;
-        }
-    }
-    ::freeifaddrs(ifaddr);
-
-    if (addrList.empty()) {
-        addrList.push_back(Address(TCP, LOOPBACK, port));
-    }
-}
-
 namespace {
     inline socklen_t sa_len(::sockaddr* sa)
     {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/solaris/SystemInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/solaris/SystemInfo.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/solaris/SystemInfo.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/solaris/SystemInfo.cpp Fri Jan 25 18:20:39 2013
@@ -60,31 +60,6 @@ bool SystemInfo::getLocalHostname(Addres
 static const string LOCALHOST("127.0.0.1");
 static const string TCP("tcp");
 
-void SystemInfo::getLocalIpAddresses(uint16_t port,
-                                     std::vector<Address> &addrList) {
-    int s = socket(PF_INET, SOCK_STREAM, 0);
-    for (int i=1;;i++) {
-        struct lifreq ifr;
-        ifr.lifr_index = i;
-        if (::ioctl(s, SIOCGIFADDR, &ifr) < 0) {
-            break;
-        }
-        struct sockaddr *sa = static_cast<struct sockaddr *>((void *) &ifr.lifr_addr);
-        if (sa->sa_family != AF_INET) {
-            // TODO: Url parsing currently can't cope with IPv6 addresses, defer for now
-            break;
-        }
-        struct sockaddr_in *sin = static_cast<struct sockaddr_in *>((void *)sa);
-        std::string addr(inet_ntoa(sin->sin_addr));
-        if (addr != LOCALHOST)
-            addrList.push_back(Address(TCP, addr, port));
-    }
-    if (addrList.empty()) {
-        addrList.push_back(Address(TCP, LOCALHOST, port));
-    }
-    close (s);
-}
-
 void SystemInfo::getSystemId(std::string &osName,
                              std::string &nodeName,
                              std::string &release,

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ssl/util.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ssl/util.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ssl/util.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ssl/util.cpp Fri Jan 25 18:20:39 2013
@@ -31,8 +31,6 @@
 
 #include <iostream>
 #include <fstream>
-#include <boost/filesystem/operations.hpp>
-#include <boost/filesystem/path.hpp>
 
 namespace qpid {
 namespace sys {
@@ -82,15 +80,14 @@ SslOptions SslOptions::global;
 char* readPasswordFromFile(PK11SlotInfo*, PRBool retry, void*)
 {
     const std::string& passwordFile = SslOptions::global.certPasswordFile;
-    if (retry || passwordFile.empty() || !boost::filesystem::exists(passwordFile)) {
-        return 0;
-    } else {
-        std::ifstream file(passwordFile.c_str());
-        std::string password;
-        file >> password;
-        return PL_strdup(password.c_str());
-    }
-}    
+    if (retry || passwordFile.empty()) return 0;
+    std::ifstream file(passwordFile.c_str());
+    if (!file) return 0;
+
+    std::string password;
+    file >> password;
+    return PL_strdup(password.c_str());
+}
 
 void initNSS(const SslOptions& options, bool server)
 {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp Fri Jan 25 18:20:39 2013
@@ -24,6 +24,9 @@
 #include <sys/stat.h>
 #include <direct.h>
 #include <errno.h>
+#include <windows.h>
+#include <strsafe.h>
+
 
 namespace qpid {
 namespace sys {
@@ -50,4 +53,36 @@ void FileSysDir::mkdir(void)
         throw Exception ("Can't create directory: " + dirPath);
 }
 
+void FileSysDir::forEachFile(Callback cb) const {
+
+    WIN32_FIND_DATAA findFileData;
+    char             szDir[MAX_PATH];
+    size_t           dirPathLength;
+    HANDLE           hFind = INVALID_HANDLE_VALUE;
+
+    // create dirPath+"\*" in szDir
+    StringCchLength (dirPath.c_str(), MAX_PATH, &dirPathLength);
+
+    if (dirPathLength > (MAX_PATH - 3)) {
+        throw Exception ("Directory path is too long: " + dirPath);
+    }
+
+    StringCchCopy(szDir, MAX_PATH, dirPath.c_str());
+    StringCchCat(szDir, MAX_PATH, TEXT("\\*"));
+
+    // Special work for first file
+    hFind = FindFirstFileA(szDir, &findFileData);
+    if (INVALID_HANDLE_VALUE == hFind) {
+        return;
+    }
+
+    // process everything that isn't a directory
+    do {
+        if (!(findFileData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY)) {
+            std::string fileName(findFileData.cFileName);
+            cb(fileName);
+        }
+    } while (FindNextFile(hFind, &findFileData) != 0);
+}
+
 }} // namespace qpid::sys

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp Fri Jan 25 18:20:39 2013
@@ -67,35 +67,6 @@ bool SystemInfo::getLocalHostname (Addre
 static const std::string LOCALHOST("127.0.0.1");
 static const std::string TCP("tcp");
 
-void SystemInfo::getLocalIpAddresses (uint16_t port,
-                                      std::vector<Address> &addrList) {
-    enum { MAX_URL_INTERFACES = 100 };
-
-    SOCKET s = socket (PF_INET, SOCK_STREAM, 0);
-    if (s != INVALID_SOCKET) {
-        INTERFACE_INFO interfaces[MAX_URL_INTERFACES];
-        DWORD filledBytes = 0;
-        WSAIoctl (s,
-                  SIO_GET_INTERFACE_LIST,
-                  0,
-                  0,
-                  interfaces,
-                  sizeof (interfaces),
-                  &filledBytes,
-                  0,
-                  0);
-        unsigned int interfaceCount = filledBytes / sizeof (INTERFACE_INFO);
-        for (unsigned int i = 0; i < interfaceCount; ++i) {
-            if (interfaces[i].iiFlags & IFF_UP) {
-                std::string addr(inet_ntoa(interfaces[i].iiAddress.AddressIn.sin_addr));
-                if (addr != LOCALHOST)
-                    addrList.push_back(Address(TCP, addr, port));
-            }
-        }
-        closesocket (s);
-    }
-}
-
 // Null function which always fails to find an network interface name
 bool SystemInfo::getInterfaceAddresses(const std::string&, std::vector<std::string>&)
 {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/types/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/types/Variant.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/types/Variant.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/types/Variant.cpp Fri Jan 25 18:20:39 2013
@@ -110,21 +110,28 @@ class VariantImpl
     } value;
     std::string encoding;//optional encoding for variable length data
 
-    template<class T> T convertFromString() const
+  template<class T> T convertFromString() const
     {
         const std::string& s = *value.string;
+
         try {
-            T r = boost::lexical_cast<T>(s);
-            //lexical_cast won't fail if string is a negative number and T is unsigned
-            //So check that and allow special case of negative zero
-            //else its a non-zero negative number so throw exception at end of function
-            if (std::numeric_limits<T>::is_signed || s.find('-') != 0 || r == 0) {
-                return r;
+            // Extra shenanigans to work around negative zero
+            // conversion error in older GCC libs.
+            if ( s[0] != '-' ) {
+                return boost::lexical_cast<T>(s);
+            } else {
+                T r = boost::lexical_cast<T>(s.substr(1));
+                if (std::numeric_limits<T>::is_signed) {
+                    return -r;                    
+                } else {
+                    if (r==0) return 0;
+                }
             }
         } catch(const boost::bad_lexical_cast&) {
         }
         throw InvalidConversion(QPID_MSG("Cannot convert " << s));
     }
+
 };
 
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpidd.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpidd.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpidd.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpidd.cpp Fri Jan 25 18:20:39 2013
@@ -75,17 +75,19 @@ int run_broker(int argc, char *argv[], b
         for (vector<string>::iterator iter = bootOptions.module.load.begin();
              iter != bootOptions.module.load.end();
              iter++)
-            qpid::tryShlib (iter->data(), false);
+            qpid::tryShlib (*iter);
 
         if (!bootOptions.module.noLoad) {
             bool isDefault = defaultPath == bootOptions.module.loadDir;
             qpid::loadModuleDir (bootOptions.module.loadDir, isDefault);
         }
 
-        // Parse options
+        // Parse options.  In the second pass, do not allow unknown options.
+        // All the modules have been added now, so any unknown options
+        // should be flagged as errors.
         try {
             options.reset(new QpiddOptions(argv[0]));
-            options->parse(argc, argv, options->common.config);
+            options->parse(argc, argv, options->common.config, false);
         } catch (const std::exception& /*e*/) {
             if (helpArgSeen) {
                  // provide help even when parsing fails

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.mk
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.mk?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.mk (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.mk Fri Jan 25 18:20:39 2013
@@ -50,7 +50,8 @@ sslconnector_la_SOURCES = \
 
 if HAVE_PROTON
 sslconnector_la_SOURCES += \
-  qpid/messaging/amqp/SslTransport.cpp
+  qpid/messaging/amqp/SslTransport.cpp \
+  qpid/messaging/amqp/SslTransport.h
 endif #HAVE_PROTON
 
 

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/tests:r1422061-1438053

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/.valgrind.supp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/.valgrind.supp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/.valgrind.supp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/.valgrind.supp Fri Jan 25 18:20:39 2013
@@ -39,21 +39,6 @@
    fun:_sasl_load_plugins
    fun:sasl_client_init
 }
-{
-   Benign leak in CPG - patched version.
-   Memcheck:Leak
-   fun:*
-   fun:openais_service_connect
-   fun:cpg_initialize
-}
-
-{
-   Benign error in libcpg.
-   Memcheck:Param
-   socketcall.sendmsg(msg.msg_iov[i])
-   obj:*/libpthread-2.5.so
-   obj:*/libcpg.so.2.0.0
-}
 
 {
    Uninitialised value problem in _dl_relocate (F7, F8)
@@ -161,14 +146,6 @@
 }
 
 {
-  CPG error - seems benign.
-   Memcheck:Param
-   socketcall.sendmsg(msg.msg_iov[i])
-   obj:*
-   obj:*/libcpg.so.2.0.0
-}
-
-{
    Known leak in boost.thread 1.33.1. Wildcards for 64/32 bit diffs.
    Memcheck:Leak
    fun:*



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org