You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2013/01/25 19:20:49 UTC
svn commit: r1438629 [5/10] - in
/qpid/branches/java-broker-config-qpid-4390: ./ qpid/ qpid/bin/ qpid/cpp/
qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf2/ qpid/cpp/bindings/qpid/
qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/t/ qpid/cpp/bindi...
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.cpp Fri Jan 25 18:20:39 2013
@@ -19,6 +19,12 @@
*
*/
#include "Membership.h"
+#include "HaBroker.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
+#include "qmf/org/apache/qpid/ha/HaBroker.h"
#include <boost/bind.hpp>
#include <iostream>
#include <iterator>
@@ -26,37 +32,57 @@
namespace qpid {
namespace ha {
+namespace _qmf = ::qmf::org::apache::qpid::ha;
-void Membership::reset(const BrokerInfo& b) {
+using sys::Mutex;
+using types::Variant;
+
+Membership::Membership(const BrokerInfo& info, HaBroker& b)
+ : haBroker(b), self(info.getSystemId())
+{
+ brokers[self] = info;
+}
+
+void Membership::clear() {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo me = brokers[self];
brokers.clear();
- brokers[b.getSystemId()] = b;
+ brokers[self] = me;
}
void Membership::add(const BrokerInfo& b) {
+ Mutex::ScopedLock l(lock);
brokers[b.getSystemId()] = b;
+ update(l);
}
void Membership::remove(const types::Uuid& id) {
+ Mutex::ScopedLock l(lock);
BrokerInfo::Map::iterator i = brokers.find(id);
if (i != brokers.end()) {
brokers.erase(i);
- }
+ update(l);
+ }
}
bool Membership::contains(const types::Uuid& id) {
+ Mutex::ScopedLock l(lock);
return brokers.find(id) != brokers.end();
}
void Membership::assign(const types::Variant::List& list) {
+ Mutex::ScopedLock l(lock);
brokers.clear();
for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
BrokerInfo b(i->asMap());
brokers[b.getSystemId()] = b;
}
+ update(l);
}
types::Variant::List Membership::asList() const {
+ Mutex::ScopedLock l(lock);
types::Variant::List list;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
list.push_back(i->second.asMap());
@@ -64,6 +90,7 @@ types::Variant::List Membership::asList(
}
BrokerInfo::Set Membership::otherBackups() const {
+ Mutex::ScopedLock l(lock);
BrokerInfo::Set result;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
if (i->second.getStatus() == READY && i->second.getSystemId() != self)
@@ -71,15 +98,84 @@ BrokerInfo::Set Membership::otherBackups
return result;
}
-bool Membership::get(const types::Uuid& id, BrokerInfo& result) {
- BrokerInfo::Map::iterator i = brokers.find(id);
+bool Membership::get(const types::Uuid& id, BrokerInfo& result) const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Map::const_iterator i = brokers.find(id);
if (i == brokers.end()) return false;
result = i->second;
return true;
}
-std::ostream& operator<<(std::ostream& o, const Membership& members) {
- return o << members.brokers;
+void Membership::update(Mutex::ScopedLock& l) {
+ QPID_LOG(info, "Membership: " << brokers);
+ Variant::List brokers = asList();
+ if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str());
+ if (mgmtObject) mgmtObject->set_members(brokers);
+ haBroker.getBroker().getManagementAgent()->raiseEvent(
+ _qmf::EventMembersUpdate(brokers));
+}
+
+void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
+ Mutex::ScopedLock l(lock);
+ mgmtObject = mo;
+ update(l);
+}
+
+
+namespace {
+bool checkTransition(BrokerStatus from, BrokerStatus to) {
+ // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
+ static const BrokerStatus TRANSITIONS[][2] = {
+ { STANDALONE, JOINING }, // Initialization of backup broker
+ { JOINING, CATCHUP }, // Connected to primary
+ { JOINING, RECOVERING }, // Chosen as initial primary.
+ { CATCHUP, READY }, // Caught up all queues, ready to take over.
+ { READY, RECOVERING }, // Chosen as new primary
+ { READY, CATCHUP }, // Timed out failing over, demoted to catch-up.
+ { RECOVERING, ACTIVE } // All expected backups are ready
+ };
+ static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
+ for (size_t i = 0; i < N; ++i) {
+ if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
+ return true;
+ }
+ return false;
+}
+} // namespace
+
+void Membership::setStatus(BrokerStatus newStatus) {
+ BrokerStatus status = getStatus();
+ QPID_LOG(info, "Status change: "
+ << printable(status) << " -> " << printable(newStatus));
+ bool legal = checkTransition(status, newStatus);
+ if (!legal) {
+ haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status)
+ << " -> " << printable(newStatus)));
+ }
+
+ Mutex::ScopedLock l(lock);
+ brokers[self].setStatus(newStatus);
+ if (mgmtObject) mgmtObject->set_status(printable(newStatus).str());
+ update(l);
+}
+
+BrokerStatus Membership::getStatus() const {
+ Mutex::ScopedLock l(lock);
+ return getStatus(l);
+}
+
+BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const {
+ BrokerInfo::Map::const_iterator i = brokers.find(self);
+ assert(i != brokers.end());
+ return i->second.getStatus();
+}
+
+BrokerInfo Membership::getInfo() const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Map::const_iterator i = brokers.find(self);
+ assert(i != brokers.end());
+ return i->second;
}
+// FIXME aconway 2013-01-23: move to .h?
}} // namespace qpid::ha
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.h Fri Jan 25 18:20:39 2013
@@ -24,45 +24,72 @@
#include "BrokerInfo.h"
#include "types.h"
-#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/types/Variant.h"
#include <boost/function.hpp>
#include <set>
#include <vector>
#include <iosfwd>
+
+namespace qmf { namespace org { namespace apache { namespace qpid { namespace ha {
+class HaBroker;
+}}}}}
+
namespace qpid {
+
+namespace broker {
+class Broker;
+}
+
+namespace types {
+class Uuid;
+}
+
namespace ha {
+class HaBroker;
/**
* Keep track of the brokers in the membership.
- * THREAD UNSAFE: caller must serialize
+ * Send management when events on membership changes.
+ * THREAD SAFE
*/
class Membership
{
public:
- Membership(const types::Uuid& self_) : self(self_) {}
+ Membership(const BrokerInfo& info, HaBroker&);
+
+ void setMgmtObject(boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker>);
- void reset(const BrokerInfo& b); ///< Reset to contain just one member.
+ void clear(); ///< Clear all but self.
void add(const BrokerInfo& b);
void remove(const types::Uuid& id);
bool contains(const types::Uuid& id);
+
/** Return IDs of all READY backups other than self */
BrokerInfo::Set otherBackups() const;
void assign(const types::Variant::List&);
types::Variant::List asList() const;
- bool get(const types::Uuid& id, BrokerInfo& result);
+ bool get(const types::Uuid& id, BrokerInfo& result) const;
+
+ types::Uuid getSelf() const { return self; }
+ BrokerInfo getInfo() const;
+ BrokerStatus getStatus() const;
+ void setStatus(BrokerStatus s);
private:
- types::Uuid self;
+ void update(sys::Mutex::ScopedLock&);
+ BrokerStatus getStatus(sys::Mutex::ScopedLock&) const;
+
+ mutable sys::Mutex lock;
+ HaBroker& haBroker;
+ boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker> mgmtObject;
+ const types::Uuid self;
BrokerInfo::Map brokers;
- friend std::ostream& operator<<(std::ostream&, const Membership&);
};
-std::ostream& operator<<(std::ostream&, const Membership&);
-
}} // namespace qpid::ha
#endif /*!QPID_HA_MEMBERSHIP_H*/
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.cpp Fri Jan 25 18:20:39 2013
@@ -82,8 +82,10 @@ class ExpectedBackupTimerTask : public s
Primary* Primary::instance = 0;
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
- haBroker(hb), logPrefix("Primary: "), active(false)
+ haBroker(hb), membership(hb.getMembership()),
+ logPrefix("Primary: "), active(false)
{
+ hb.getMembership().setStatus(RECOVERING);
assert(instance == 0);
instance = this; // Let queue replicators find us.
if (expect.empty()) {
@@ -96,7 +98,7 @@ Primary::Primary(HaBroker& hb, const Bro
QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(*i, haBroker.getReplicationTest(), false));
+ new RemoteBackup(*i, haBroker.getReplicationTest(), 0));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) expectedBackups.insert(backup);
backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards
@@ -108,11 +110,18 @@ Primary::Primary(HaBroker& hb, const Bro
hb.getBroker().getTimer().add(timerTask);
}
+
+ // Remove backup tag property from outgoing link properties.
+ framing::FieldTable linkProperties = hb.getBroker().getLinkClientProperties();
+ linkProperties.erase(ConnectionObserver::BACKUP_TAG);
+ hb.getBroker().setLinkClientProperties(linkProperties);
+
configurationObserver.reset(new PrimaryConfigurationObserver(*this));
haBroker.getBroker().getConfigurationObservers().add(configurationObserver);
Mutex::ScopedLock l(lock); // We are now active as a configurationObserver
checkReady(l);
+
// Allow client connections
connectionObserver.reset(new PrimaryConnectionObserver(*this));
haBroker.getObserver()->setObserver(connectionObserver, logPrefix);
@@ -128,7 +137,7 @@ void Primary::checkReady(Mutex::ScopedLo
active = true;
Mutex::ScopedUnlock u(lock); // Don't hold lock across callback
QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active.");
- haBroker.activate();
+ membership.setStatus(ACTIVE);
}
}
@@ -136,7 +145,7 @@ void Primary::checkReady(BackupMap::iter
if (i != backups.end() && i->second->reportReady()) {
BrokerInfo info = i->second->getBrokerInfo();
info.setStatus(READY);
- haBroker.addBroker(info);
+ membership.add(info);
if (expectedBackups.erase(i->second)) {
QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
checkReady(l);
@@ -161,9 +170,10 @@ void Primary::timeoutExpectedBackups() {
expectedBackups.erase(i++);
backups.erase(info.getSystemId());
rb->cancel();
- // Downgrade the broker to CATCHUP
+ // Downgrade the broker's status to CATCHUP
+ // The broker will get this status change when it eventually connects.
info.setStatus(CATCHUP);
- haBroker.addBroker(info);
+ membership.add(info);
}
else ++i;
}
@@ -228,7 +238,7 @@ void Primary::opened(broker::Connection&
if (i == backups.end()) {
QPID_LOG(info, logPrefix << "New backup connected: " << info);
boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(info, haBroker.getReplicationTest(), true));
+ new RemoteBackup(info, haBroker.getReplicationTest(), &connection));
{
// Avoid deadlock with queue registry lock.
Mutex::ScopedUnlock u(lock);
@@ -238,11 +248,11 @@ void Primary::opened(broker::Connection&
}
else {
QPID_LOG(info, logPrefix << "Known backup connected: " << info);
- i->second->setConnected(true);
+ i->second->setConnection(&connection);
checkReady(i, l);
}
if (info.getStatus() == JOINING) info.setStatus(CATCHUP);
- haBroker.addBroker(info);
+ membership.add(info);
}
else
QPID_LOG(debug, logPrefix << "Accepted client connection "
@@ -259,7 +269,7 @@ void Primary::closed(broker::Connection&
// Checking isConnected() lets us ignore such spurious closes.
if (i != backups.end() && i->second->isConnected()) {
QPID_LOG(info, logPrefix << "Backup disconnected: " << info);
- haBroker.removeBroker(info.getSystemId());
+ membership.remove(info.getSystemId());
expectedBackups.erase(i->second);
backups.erase(i);
checkReady(l);
@@ -275,4 +285,9 @@ boost::shared_ptr<QueueGuard> Primary::g
return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q);
}
+Role* Primary::promote() {
+ QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo());
+ return 0;
+}
+
}} // namespace qpid::ha
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.h Fri Jan 25 18:20:39 2013
@@ -24,6 +24,7 @@
#include "types.h"
#include "BrokerInfo.h"
+#include "Role.h"
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
@@ -48,6 +49,7 @@ class HaBroker;
class ReplicatingSubscription;
class RemoteBackup;
class QueueGuard;
+class Membership;
/**
* State associated with a primary broker:
@@ -56,7 +58,7 @@ class QueueGuard;
*
* THREAD SAFE: called concurrently in arbitrary connection threads.
*/
-class Primary
+class Primary : public Role
{
public:
typedef boost::shared_ptr<broker::Queue> QueuePtr;
@@ -67,6 +69,11 @@ class Primary
Primary(HaBroker& hb, const BrokerInfo::Set& expectedBackups);
~Primary();
+ // Role implementation
+ std::string getLogPrefix() const { return logPrefix; }
+ Role* promote();
+ void setBrokerUrl(const Url&) {}
+
void readyReplica(const ReplicatingSubscription&);
void removeReplica(const std::string& q);
@@ -94,12 +101,13 @@ class Primary
sys::Mutex lock;
HaBroker& haBroker;
+ Membership& membership;
std::string logPrefix;
bool active;
/**
* Set of expected backups that must be ready before we declare ourselves
- * active. These are backups that were known before the primary crashed. As
- * new primary we expect them to re-connect.
+ * active. These are backups that were known and ready before the primary
+ * crashed. As new primary we expect them to re-connect.
*/
BackupSet expectedBackups;
/**
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.cpp Fri Jan 25 18:20:39 2013
@@ -66,7 +66,7 @@ QueueGuard::~QueueGuard() { cancel(); }
// NOTE: Called with message lock held.
void QueueGuard::enqueued(const Message& m) {
// Delay completion
- QPID_LOG(trace, logPrefix << "Delayed completion of " << m);
+ QPID_LOG(trace, logPrefix << "Delayed completion of " << m.getSequence());
m.getIngressCompletion()->startCompleter();
{
Mutex::ScopedLock l(lock);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Jan 25 18:20:39 2013
@@ -260,9 +260,7 @@ void QueueReplicator::route(Deliverable&
// Ignore unknown event keys, may be introduced in later versions.
}
catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Replication failed: " << e.what());
- haBroker.shutdown();
- throw;
+ haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what()));
}
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Fri Jan 25 18:20:39 2013
@@ -21,6 +21,7 @@
#include "RemoteBackup.h"
#include "QueueGuard.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/log/Statement.h"
@@ -32,9 +33,10 @@ namespace ha {
using sys::Mutex;
using boost::bind;
-RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
- logPrefix("Primary: Remote backup "+info.getLogId()+": "),
- brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
+RemoteBackup::RemoteBackup(
+ const BrokerInfo& info, ReplicationTest rt, broker::Connection* c
+) : logPrefix("Primary: Remote backup "+info.getLogId()+": "),
+ brokerInfo(info), replicationTest(rt), connection(c), reportedReady(false)
{}
void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards)
@@ -46,13 +48,19 @@ void RemoteBackup::setCatchupQueues(brok
RemoteBackup::~RemoteBackup() { cancel(); }
void RemoteBackup::cancel() {
+ QPID_LOG(debug, logPrefix << "Cancelled " << (connection? "connected":"disconnected")
+ << " backup: " << brokerInfo);
for (GuardMap::iterator i = guards.begin(); i != guards.end(); ++i)
i->second->cancel();
guards.clear();
+ if (connection) {
+ connection->abort();
+ connection = 0;
+ }
}
bool RemoteBackup::isReady() {
- return connected && catchupQueues.empty();
+ return connection && catchupQueues.empty();
}
void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.h Fri Jan 25 18:20:39 2013
@@ -33,6 +33,7 @@ namespace qpid {
namespace broker {
class Queue;
class QueueRegistry;
+class Connection;
}
namespace ha {
@@ -54,7 +55,7 @@ class RemoteBackup
/** Note: isReady() can be true after construction
*@param connected true if the backup is already connected.
*/
- RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected);
+ RemoteBackup(const BrokerInfo&, ReplicationTest, broker::Connection*);
~RemoteBackup();
/** Set all queues in the registry as catch-up queues.
@@ -66,8 +67,8 @@ class RemoteBackup
GuardPtr guard(const QueuePtr&);
/** Is the remote backup connected? */
- void setConnected(bool b) { connected=b; }
- bool isConnected() const { return connected; }
+ void setConnection(broker::Connection* c) { connection = c; }
+ bool isConnected() const { return connection; }
/** ReplicatingSubscription associated with queue is ready.
* Note: may set isReady()
@@ -101,7 +102,7 @@ class RemoteBackup
ReplicationTest replicationTest;
GuardMap guards;
QueueSet catchupQueues;
- bool connected;
+ broker::Connection* connection;
bool reportedReady;
};
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Jan 25 18:20:39 2013
@@ -65,6 +65,8 @@ class QueueGuard;
class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
{
public:
+ typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
+
struct Factory : public broker::ConsumerFactory {
boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
broker::SemanticState* parent,
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/StatusCheck.h Fri Jan 25 18:20:39 2013
@@ -32,6 +32,11 @@
namespace qpid {
namespace ha {
+// FIXME aconway 2012-12-21: This solution is incomplete. It will only protect
+// against bad promotion if there are READY brokers when this broker starts.
+// It will not help the situation where brokers became READY after this one starts.
+//
+
/**
* Check whether a JOINING broker can be promoted .
*
@@ -49,8 +54,10 @@ class StatusCheck
~StatusCheck();
void setUrl(const Url&);
bool canPromote();
- void setPromote(bool p);
+
private:
+ void setPromote(bool p);
+
std::string logPrefix;
sys::Mutex lock;
std::vector<sys::Thread> threads;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Jan 25 18:20:39 2013
@@ -28,11 +28,13 @@
#include "qpid/management/ManagementObject.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/log/Statement.h"
-#include <qpid/broker/Message.h>
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Broker.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/PollableQueue.h"
#include "qpid/broker/ConnectionState.h"
@@ -47,6 +49,9 @@
#include <sstream>
#include <typeinfo>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+
namespace qpid {
namespace management {
@@ -92,6 +97,32 @@ struct ScopedManagementContext
setManagementExecutionContext(0);
}
};
+
+typedef boost::function0<void> FireFunction;
+struct Periodic : public qpid::sys::TimerTask
+{
+ FireFunction fireFunction;
+ qpid::sys::Timer* timer;
+
+ Periodic (FireFunction f, qpid::sys::Timer* t, uint32_t seconds);
+ virtual ~Periodic ();
+ void fire ();
+};
+
+Periodic::Periodic (FireFunction f, qpid::sys::Timer* t, uint32_t _seconds)
+ : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC),
+ "ManagementAgent::periodicProcessing"),
+ fireFunction(f), timer(t) {}
+
+Periodic::~Periodic() {}
+
+void Periodic::fire()
+{
+ setupNextFire();
+ timer->add(this);
+ fireFunction();
+}
+
}
@@ -170,6 +201,8 @@ void ManagementAgent::configure(const st
sendQueue.reset(
new EventQueue(boost::bind(&ManagementAgent::sendEvents, this, _1), broker->getPoller()));
sendQueue->start();
+ timer = &broker->getTimer();
+ timer->add(new Periodic(boost::bind(&ManagementAgent::periodicProcessing, this), timer, interval));
// Get from file or generate and save to file.
if (dataDir.empty())
@@ -212,13 +245,6 @@ void ManagementAgent::configure(const st
}
}
-void ManagementAgent::pluginsInitialized() {
- // Do this here so cluster plugin has the chance to set up the timer.
- timer = &broker->getClusterTimer();
- timer->add(new Periodic(*this, interval));
-}
-
-
void ManagementAgent::setName(const string& vendor, const string& product, const string& instance)
{
if (vendor.find(':') != vendor.npos) {
@@ -420,20 +446,6 @@ void ManagementAgent::raiseEvent(const M
}
}
-ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
- : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC),
- "ManagementAgent::periodicProcessing"),
- agent(_agent) {}
-
-ManagementAgent::Periodic::~Periodic() {}
-
-void ManagementAgent::Periodic::fire()
-{
- setupNextFire();
- agent.timer->add(this);
- agent.periodicProcessing();
-}
-
void ManagementAgent::clientAdded (const string& routingKey)
{
sys::Mutex::ScopedLock lock(userLock);
@@ -477,17 +489,6 @@ void ManagementAgent::clientAdded (const
}
}
-void ManagementAgent::clusterUpdate() {
- // Called on all cluster memebers when a new member joins a cluster.
- // Set clientWasAdded so that on the next periodicProcessing we will do
- // a full update on all cluster members.
- sys::Mutex::ScopedLock l(userLock);
- moveNewObjects(); // keep lists consistent with updater/updatee.
- moveDeletedObjects();
- clientWasAdded = true;
- debugSnapshot("Cluster member joined");
-}
-
void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
{
buf.putOctet ('A');
@@ -2550,70 +2551,6 @@ void ManagementAgent::SchemaClass::mapDe
}
}
-void ManagementAgent::exportSchemas(string& out) {
- Variant::List list_;
- Variant::Map map_, kmap, cmap;
-
- for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) {
- string name = i->first;
- const ClassMap& classes = i ->second;
- for (ClassMap::const_iterator j = classes.begin(); j != classes.end(); ++j) {
- const SchemaClassKey& key = j->first;
- const SchemaClass& klass = j->second;
- if (klass.writeSchemaCall == 0) { // Ignore built-in schemas.
- // Encode name, schema-key, schema-class
-
- map_.clear();
- kmap.clear();
- cmap.clear();
-
- key.mapEncode(kmap);
- klass.mapEncode(cmap);
-
- map_["_pname"] = name;
- map_["_key"] = kmap;
- map_["_class"] = cmap;
- list_.push_back(map_);
- }
- }
- }
-
- ListCodec::encode(list_, out);
-}
-
-void ManagementAgent::importSchemas(qpid::framing::Buffer& inBuf) {
-
- string buf(inBuf.getPointer(), inBuf.available());
- Variant::List content;
- ListCodec::decode(buf, content);
- Variant::List::const_iterator l;
-
-
- for (l = content.begin(); l != content.end(); l++) {
- string package;
- SchemaClassKey key;
- SchemaClass klass;
- Variant::Map map_, kmap, cmap;
- Variant::Map::const_iterator i;
-
- map_ = l->asMap();
-
- if ((i = map_.find("_pname")) != map_.end()) {
- package = i->second.asString();
-
- if ((i = map_.find("_key")) != map_.end()) {
- key.mapDecode(i->second.asMap());
-
- if ((i = map_.find("_class")) != map_.end()) {
- klass.mapDecode(i->second.asMap());
-
- packages[package][key] = klass;
- }
- }
- }
- }
-}
-
void ManagementAgent::RemoteAgent::mapEncode(Variant::Map& map_) const {
Variant::Map _objId, _values;
@@ -2657,52 +2594,6 @@ void ManagementAgent::RemoteAgent::mapDe
mgmtObject->set_connectionRef(connectionRef);
}
-void ManagementAgent::exportAgents(string& out) {
- Variant::List list_;
- Variant::Map map_, omap, amap;
-
- for (RemoteAgentMap::const_iterator i = remoteAgents.begin();
- i != remoteAgents.end();
- ++i)
- {
- // TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode
- boost::shared_ptr<RemoteAgent> agent(i->second);
-
- map_.clear();
- amap.clear();
-
- agent->mapEncode(amap);
- map_["_remote_agent"] = amap;
- list_.push_back(map_);
- }
-
- ListCodec::encode(list_, out);
-}
-
-void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {
- string buf(inBuf.getPointer(), inBuf.available());
- Variant::List content;
- ListCodec::decode(buf, content);
- Variant::List::const_iterator l;
- sys::Mutex::ScopedLock lock(userLock);
-
- for (l = content.begin(); l != content.end(); l++) {
- boost::shared_ptr<RemoteAgent> agent(new RemoteAgent(*this));
- Variant::Map map_;
- Variant::Map::const_iterator i;
-
- map_ = l->asMap();
-
- if ((i = map_.find("_remote_agent")) != map_.end()) {
-
- agent->mapDecode(i->second.asMap());
-
- addObject (agent->mgmtObject, 0, false);
- remoteAgents[agent->connectionRef] = agent;
- }
- }
-}
-
namespace {
bool isDeletedMap(const ManagementObjectMap::value_type& value) {
return value.second->isDeleted();
@@ -2781,54 +2672,6 @@ Variant::Map ManagementAgent::toMap(cons
return map;
}
-
-// Build up a list of the current set of deleted objects that are pending their
-// next (last) publish-ment.
-void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList)
-{
- outList.clear();
-
- sys::Mutex::ScopedLock lock (userLock);
-
- moveNewObjects();
- moveDeletedObjects();
-
- // now copy the pending deletes into the outList
- for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin();
- mIter != pendingDeletedObjs.end(); mIter++) {
- for (DeletedObjectList::iterator lIter = mIter->second.begin();
- lIter != mIter->second.end(); lIter++) {
- outList.push_back(*lIter);
- }
- }
-}
-
-// Called by cluster to reset the management agent's list of deleted
-// objects to match the rest of the cluster.
-void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList)
-{
- sys::Mutex::ScopedLock lock (userLock);
- sys::Mutex::ScopedLock objLock(objectLock);
- // Clear out any existing deleted objects
- moveNewObjects();
- pendingDeletedObjs.clear();
- ManagementObjectMap::iterator i = managementObjects.begin();
- // Silently drop any deleted objects left over from receiving the update.
- while (i != managementObjects.end()) {
- ManagementObject::shared_ptr object = i->second;
- if (object->isDeleted()) {
- managementObjects.erase(i++);
- }
- else ++i;
- }
- for (DeletedObjectList::const_iterator lIter = inList.begin(); lIter != inList.end(); lIter++) {
-
- std::string classkey((*lIter)->packageName + std::string(":") + (*lIter)->className);
- pendingDeletedObjs[classkey].push_back(*lIter);
- }
-}
-
-
// construct a DeletedObject from a management object.
ManagementAgent::DeletedObject::DeletedObject(ManagementObject::shared_ptr src, bool v1, bool v2)
: packageName(src->getPackageName()),
@@ -2866,45 +2709,6 @@ ManagementAgent::DeletedObject::DeletedO
}
}
-
-
-// construct a DeletedObject from an encoded representation. Used by
-// clustering to move deleted objects between clustered brokers. See
-// DeletedObject::encode() for the reverse.
-ManagementAgent::DeletedObject::DeletedObject(const std::string& encoded)
-{
- qpid::types::Variant::Map map_;
- MapCodec::decode(encoded, map_);
-
- packageName = map_["_package_name"].getString();
- className = map_["_class_name"].getString();
- objectId = map_["_object_id"].getString();
-
- encodedV1Config = map_["_v1_config"].getString();
- encodedV1Inst = map_["_v1_inst"].getString();
- encodedV2 = map_["_v2_data"].asMap();
-}
-
-
-// encode a DeletedObject to a string buffer. Used by
-// clustering to move deleted objects between clustered brokers. See
-// DeletedObject(const std::string&) for the reverse.
-void ManagementAgent::DeletedObject::encode(std::string& toBuffer)
-{
- qpid::types::Variant::Map map_;
-
-
- map_["_package_name"] = packageName;
- map_["_class_name"] = className;
- map_["_object_id"] = objectId;
-
- map_["_v1_config"] = encodedV1Config;
- map_["_v1_inst"] = encodedV1Inst;
- map_["_v2_data"] = encodedV2;
-
- MapCodec::encode(map_, toBuffer);
-}
-
// Remove Deleted objects, and save for later publishing...
bool ManagementAgent::moveDeletedObjects() {
typedef vector<pair<ObjectId, ManagementObject::shared_ptr> > DeleteList;
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:r1422061-1438053
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h Fri Jan 25 18:20:39 2013
@@ -26,7 +26,6 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Timer.h"
#include "qpid/broker/ConnectionToken.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/management/ManagementEvent.h"
@@ -47,6 +46,9 @@ namespace qpid {
namespace broker {
class ConnectionState;
}
+namespace sys {
+class Timer;
+}
namespace management {
class ManagementAgent
@@ -75,11 +77,6 @@ public:
/** Called before plugins are initialized */
void configure (const std::string& dataDir, bool publish, uint16_t interval,
qpid::broker::Broker* broker, int threadPoolSize);
- /** Called after plugins are initialized. */
- void pluginsInitialized();
-
- /** Called by cluster to suppress management output during update. */
- void suppress(bool s) { suppressed = s; }
void setName(const std::string& vendor,
const std::string& product,
@@ -112,8 +109,6 @@ public:
severity_t severity = SEV_DEFAULT);
QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
- QPID_BROKER_EXTERN void clusterUpdate();
-
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args,
@@ -123,25 +118,6 @@ public:
/** Disallow a method. Attempts to call it will receive an exception with message. */
void disallow(const std::string& className, const std::string& methodName, const std::string& message);
- /** Disallow all QMFv1 methods (used in clustered brokers). */
- void disallowV1Methods() { disallowAllV1Methods = true; }
-
- /** Serialize my schemas as a binary blob into schemaOut */
- void exportSchemas(std::string& schemaOut);
-
- /** Serialize my remote-agent map as a binary blob into agentsOut */
- void exportAgents(std::string& agentsOut);
-
- /** Decode a serialized schemas and add to my schema cache */
- void importSchemas(framing::Buffer& inBuf);
-
- /** Decode a serialized agent map */
- void importAgents(framing::Buffer& inBuf);
-
- // these are in support of the managementSetup-state stuff, for synch'ing clustered brokers
- uint64_t getNextObjectId(void) { return nextObjectId; }
- void setNextObjectId(uint64_t o) { nextObjectId = o; }
-
uint16_t getBootSequence(void) { return bootSequence; }
void setBootSequence(uint16_t b) { bootSequence = b; writeData(); }
@@ -150,20 +126,11 @@ public:
static types::Variant::Map toMap(const framing::FieldTable& from);
- // For Clustering: management objects that have been marked as
- // "deleted", but are waiting for their last published object
- // update are not visible to the cluster replication code. These
- // interfaces allow clustering to gather up all the management
- // objects that are deleted in order to allow all clustered
- // brokers to publish the same set of deleted objects.
-
class DeletedObject {
public:
typedef boost::shared_ptr<DeletedObject> shared_ptr;
DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2);
- DeletedObject( const std::string &encoded );
~DeletedObject() {};
- void encode( std::string& toBuffer );
const std::string getKey() const {
// used to batch up objects of the same class type
return std::string(packageName + std::string(":") + className);
@@ -183,22 +150,7 @@ public:
typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList;
- /** returns a snapshot of all currently deleted management objects. */
- void exportDeletedObjects( DeletedObjectList& outList );
-
- /** Import a list of deleted objects to send on next publish interval. */
- void importDeletedObjects( const DeletedObjectList& inList );
-
private:
- struct Periodic : public qpid::sys::TimerTask
- {
- ManagementAgent& agent;
-
- Periodic (ManagementAgent& agent, uint32_t seconds);
- virtual ~Periodic ();
- void fire ();
- };
-
// Storage for tracking remote management agents, attached via the client
// management agent API.
//
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:r1422061-1438053
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp Fri Jan 25 18:20:39 2013
@@ -136,12 +136,6 @@ MessageStorePlugin::providerAvailable(co
QPID_LOG(warning, "Storage provider " << name << " duplicate; ignored.");
}
-void
-MessageStorePlugin::truncateInit(const bool /*saveStoreContent*/)
-{
- QPID_LOG(info, "Store: truncateInit");
-}
-
/**
* Record the existence of a durable queue
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/MessageStorePlugin.h Fri Jan 25 18:20:39 2013
@@ -24,18 +24,22 @@
#include "qpid/Plugin.h"
#include "qpid/Options.h"
-#include "qpid/broker/Broker.h"
#include "qpid/broker/MessageStore.h"
-#include "qpid/broker/PersistableExchange.h"
-#include "qpid/broker/PersistableMessage.h"
-#include "qpid/broker/PersistableQueue.h"
-#include "qpid/management/Manageable.h"
+//#include "qpid/management/Manageable.h"
#include <string>
using namespace qpid;
namespace qpid {
+
+namespace broker {
+class Broker;
+class PersistableExchange;
+class PersistableMessage;
+class PersistableQueue;
+}
+
namespace store {
class StorageProvider;
@@ -82,18 +86,6 @@ class MessageStorePlugin :
/**
* @name Methods inherited from qpid::broker::MessageStore
*/
- //@{
- /**
- * If called before recovery, will discard the database and reinitialize
- * using an empty store. This is used when cluster nodes recover and
- * must get their content from a cluster sync rather than directly from
- * the store.
- *
- * @param saveStoreContent If true, the store's contents should be
- * saved to a backup location before
- * reinitializing the store content.
- */
- virtual void truncateInit(const bool saveStoreContent = false);
/**
* Record the existence of a durable queue
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/StorageProvider.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/StorageProvider.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/StorageProvider.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/StorageProvider.h Fri Jan 25 18:20:39 2013
@@ -143,20 +143,6 @@ public:
/**
* @name Methods inherited from qpid::broker::MessageStore
*/
- //@{
- /**
- * If called after init() but before recovery, will discard the database
- * and reinitialize using an empty store dir. If @a pushDownStoreFiles
- * is true, the content of the store dir will be moved to a backup dir
- * inside the store dir. This is used when cluster nodes recover and must
- * get thier content from a cluster sync rather than directly fromt the
- * store.
- *
- * @param pushDownStoreFiles If true, will move content of the store dir
- * into a subdir, leaving the store dir
- * otherwise empty.
- */
- virtual void truncateInit(const bool pushDownStoreFiles = false) = 0;
/**
* Record the existence of a durable queue
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp Fri Jan 25 18:20:39 2013
@@ -26,6 +26,7 @@
#include <string>
#include <windows.h>
#include <clfsw32.h>
+#include <qpid/broker/Broker.h>
#include <qpid/broker/RecoverableQueue.h>
#include <qpid/log/Statement.h>
#include <qpid/store/MessageStorePlugin.h>
@@ -108,20 +109,6 @@ public:
/**
* @name Methods inherited from qpid::broker::MessageStore
*/
- //@{
- /**
- * If called after init() but before recovery, will discard the database
- * and reinitialize using an empty store dir. If @a pushDownStoreFiles
- * is true, the content of the store dir will be moved to a backup dir
- * inside the store dir. This is used when cluster nodes recover and must
- * get their content from a cluster sync rather than directly from the
- * store.
- *
- * @param pushDownStoreFiles If true, will move content of the store dir
- * into a subdir, leaving the store dir
- * otherwise empty.
- */
- virtual void truncateInit(const bool pushDownStoreFiles = false);
/**
* Record the existence of a durable queue
@@ -467,11 +454,6 @@ MSSqlClfsProvider::activate(MessageStore
}
void
-MSSqlClfsProvider::truncateInit(const bool pushDownStoreFiles)
-{
-}
-
-void
MSSqlClfsProvider::create(PersistableQueue& queue,
const qpid::framing::FieldTable& /*args needed for jrnl*/)
{
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp Fri Jan 25 18:20:39 2013
@@ -92,20 +92,6 @@ public:
/**
* @name Methods inherited from qpid::broker::MessageStore
*/
- //@{
- /**
- * If called after init() but before recovery, will discard the database
- * and reinitialize using an empty store dir. If @a pushDownStoreFiles
- * is true, the content of the store dir will be moved to a backup dir
- * inside the store dir. This is used when cluster nodes recover and must
- * get thier content from a cluster sync rather than directly fromt the
- * store.
- *
- * @param pushDownStoreFiles If true, will move content of the store dir
- * into a subdir, leaving the store dir
- * otherwise empty.
- */
- virtual void truncateInit(const bool pushDownStoreFiles = false);
/**
* Record the existence of a durable queue
@@ -392,11 +378,6 @@ MSSqlProvider::activate(MessageStorePlug
}
void
-MSSqlProvider::truncateInit(const bool pushDownStoreFiles)
-{
-}
-
-void
MSSqlProvider::create(PersistableQueue& queue,
const qpid::framing::FieldTable& /*args needed for jrnl*/)
{
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Fri Jan 25 18:20:39 2013
@@ -51,14 +51,14 @@ struct ProtocolTimeoutTask : public sys:
}
};
-AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool nodict0) :
+AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool isClient0, bool nodict0) :
identifier(id),
aio(0),
factory(f),
codec(0),
reads(0),
readError(false),
- isClient(false),
+ isClient(isClient0),
nodict(nodict0),
readCredit(InfiniteCredit)
{}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/AsynchIOHandler.h Fri Jan 25 18:20:39 2013
@@ -60,12 +60,10 @@ class AsynchIOHandler : public OutputCon
void write(const framing::ProtocolInitiation&);
public:
- QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f, bool nodict);
+ QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f, bool isClient, bool nodict);
QPID_COMMON_EXTERN ~AsynchIOHandler();
QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime);
- QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
-
// Output side
QPID_COMMON_EXTERN void abort();
QPID_COMMON_EXTERN void activateOutput();
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/FileSysDir.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/FileSysDir.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/FileSysDir.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/FileSysDir.h Fri Jan 25 18:20:39 2013
@@ -54,6 +54,15 @@ class FileSysDir
void mkdir(void);
+ typedef void Callback(const std::string&);
+
+ /**
+ * Call the Callback function for every regular file in the directory
+ *
+ * @param cb Callback function that receives the full path to the file
+ */
+ void forEachFile(Callback cb) const;
+
std::string getPath () { return dirPath; }
};
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/OutputControl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/OutputControl.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/OutputControl.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/OutputControl.h Fri Jan 25 18:20:39 2013
@@ -1,3 +1,6 @@
+#ifndef QPID_SYS_OUTPUT_CONTROL_H
+#define QPID_SYS_OUTPUT_CONTROL_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,9 +24,6 @@
#include "qpid/sys/IntegerTypes.h"
-#ifndef _OutputControl_
-#define _OutputControl_
-
namespace qpid {
namespace sys {
@@ -40,4 +40,4 @@ namespace sys {
}
-#endif
+#endif /*!QPID_SYS_OUTPUT_CONTROL_H*/
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ProtocolFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ProtocolFactory.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ProtocolFactory.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ProtocolFactory.h Fri Jan 25 18:20:39 2013
@@ -42,6 +42,7 @@ class ProtocolFactory : public qpid::Sha
virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
virtual void connect(
boost::shared_ptr<Poller>,
+ const std::string& name,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* codec,
ConnectFailedCallback failed) = 0;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Fri Jan 25 18:20:39 2013
@@ -23,6 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/rdma/RdmaIO.h"
@@ -83,7 +84,7 @@ class RdmaIOHandler : public OutputContr
};
RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) :
- identifier(c->getFullName()),
+ identifier(broker::QPID_NAME_PREFIX+c->getFullName()),
factory(f),
codec(0),
readError(false),
@@ -250,7 +251,7 @@ class RdmaIOProtocolFactory : public Pro
public:
RdmaIOProtocolFactory(int16_t port, int backlog);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
+ void connect(Poller::shared_ptr, const std::string& name, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
uint16_t getPort() const;
@@ -371,6 +372,7 @@ void RdmaIOProtocolFactory::connected(Po
void RdmaIOProtocolFactory::connect(
Poller::shared_ptr poller,
+ const std::string& /*name*/,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* f,
ConnectFailedCallback failed)
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/SslPlugin.cpp Fri Jan 25 18:20:39 2013
@@ -23,6 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
@@ -76,15 +77,16 @@ class SslProtocolFactory : public Protoc
SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
Timer& timer);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
+ void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
private:
- void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
- bool isClient);
+ void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
+ void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
+ void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
};
@@ -220,21 +222,24 @@ SslProtocolFactory::SslProtocolFactory(c
}
}
+void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f) {
+ AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
+ establishedCommon(async, poller, s);
+}
-void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient) {
-
- AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, nodict);
+void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, const std::string& name) {
+ AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
+ establishedCommon(async, poller, s);
+}
+void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
if (tcpNoDelay) {
s.setTcpNoDelay();
QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
- if (isClient) {
- async->setClient();
- }
-
AsynchIO* aio = AsynchIO::create(
s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
@@ -257,7 +262,7 @@ void SslProtocolFactory::accept(Poller::
for (unsigned i = 0; i<listeners.size(); ++i) {
acceptors.push_back(
AsynchAcceptor::create(listeners[i],
- boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
+ boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
acceptors[i].start(poller);
}
}
@@ -273,6 +278,7 @@ void SslProtocolFactory::connectFailed(
void SslProtocolFactory::connect(
Poller::shared_ptr poller,
+ const std::string& name,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
@@ -289,8 +295,8 @@ void SslProtocolFactory::connect(
*socket,
host,
port,
- boost::bind(&SslProtocolFactory::established,
- this, poller, _1, fact, true),
+ boost::bind(&SslProtocolFactory::establishedOutgoing,
+ this, poller, _1, fact, name),
boost::bind(&SslProtocolFactory::connectFailed,
this, _1, _2, _3, failed));
c->start(poller);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Fri Jan 25 18:20:39 2013
@@ -23,6 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
@@ -50,15 +51,17 @@ class AsynchIOProtocolFactory : public P
public:
AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
+ void connect(Poller::shared_ptr, const std::string& name,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
private:
- void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
- bool isClient);
+ void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
+ void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
+ void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
};
@@ -171,17 +174,24 @@ AsynchIOProtocolFactory::AsynchIOProtoco
}
}
-void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient) {
- AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, false);
+void AsynchIOProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f) {
+ AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
+ establishedCommon(async, poller, s);
+}
+
+void AsynchIOProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, const std::string& name) {
+ AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
+ establishedCommon(async, poller, s);
+}
+void AsynchIOProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
if (tcpNoDelay) {
s.setTcpNoDelay();
QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
- if (isClient)
- async->setClient();
AsynchIO* aio = AsynchIO::create
(s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
@@ -204,7 +214,7 @@ void AsynchIOProtocolFactory::accept(Pol
for (unsigned i = 0; i<listeners.size(); ++i) {
acceptors.push_back(
AsynchAcceptor::create(listeners[i],
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+ boost::bind(&AsynchIOProtocolFactory::establishedIncoming, this, poller, _1, fact)));
acceptors[i].start(poller);
}
}
@@ -220,6 +230,7 @@ void AsynchIOProtocolFactory::connectFai
void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
+ const std::string& name,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
@@ -235,8 +246,8 @@ void AsynchIOProtocolFactory::connect(
*socket,
host,
port,
- boost::bind(&AsynchIOProtocolFactory::established,
- this, poller, _1, fact, true),
+ boost::bind(&AsynchIOProtocolFactory::establishedOutgoing,
+ this, poller, _1, fact, name),
boost::bind(&AsynchIOProtocolFactory::connectFailed,
this, _1, _2, _3, failed));
c->start(poller);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.cpp Fri Jan 25 18:20:39 2013
@@ -96,12 +96,7 @@ void TimerTask::cancel() {
state = CANCELLED;
}
-void TimerTask::setFired() {
- // Set nextFireTime to just before now, making readyToFire() true.
- nextFireTime = AbsTime(sys::now(), Duration(-1));
-}
-
-
+// TODO AStitcher 21/08/09 The threshholds for emitting warnings are a little arbitrary
Timer::Timer() :
active(false),
late(50 * TIME_MSEC),
@@ -133,7 +128,6 @@ public:
}
};
-// TODO AStitcher 21/08/09 The threshholds for emitting warnings are a little arbitrary
void Timer::run()
{
Monitor::ScopedLock l(monitor);
@@ -151,10 +145,6 @@ void Timer::run()
{
TimerTaskCallbackScope s(*t);
if (s) {
- {
- Monitor::ScopedUnlock u(monitor);
- drop(t);
- }
if (delay > lateCancel) {
QPID_LOG(debug, t->name << " cancelled timer woken up " <<
delay / TIME_MSEC << "ms late");
@@ -235,9 +225,6 @@ void Timer::fire(boost::intrusive_ptr<Ti
}
}
-// Provided for subclasses: called when a task is droped.
-void Timer::drop(boost::intrusive_ptr<TimerTask>) {}
-
bool operator<(const intrusive_ptr<TimerTask>& a,
const intrusive_ptr<TimerTask>& b)
{
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/Timer.h Fri Jan 25 18:20:39 2013
@@ -67,10 +67,6 @@ class TimerTask : public RefCounted {
std::string getName() const { return name; }
- // Move the nextFireTime so readyToFire is true.
- // Used by the cluster, where tasks are fired on cluster events, not on local time.
- QPID_COMMON_EXTERN void setFired();
-
protected:
// Must be overridden with callback
virtual void fire() = 0;
@@ -99,7 +95,7 @@ class Timer : private Runnable {
protected:
QPID_COMMON_EXTERN virtual void fire(boost::intrusive_ptr<TimerTask> task);
- QPID_COMMON_EXTERN virtual void drop(boost::intrusive_ptr<TimerTask> task);
+
// Allow derived classes to change the late/overran thresholds.
Duration late;
Duration overran;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp Fri Jan 25 18:20:39 2013
@@ -162,11 +162,6 @@ void BSDSocket::connect(const SocketAddr
// remote port (which is unoccupied) as the port to bind the local
// end of the socket, resulting in a "circular" connection.
//
- // This seems like something the OS should prevent but I have
- // confirmed that sporadic hangs in
- // cluster_tests.LongTests.test_failover on RHEL5 are caused by
- // such a circular connection.
- //
// Raise an error if we see such a connection, since we know there is
// no listener on the peer address.
//
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/FileSysDir.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/FileSysDir.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/FileSysDir.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/FileSysDir.cpp Fri Jan 25 18:20:39 2013
@@ -18,6 +18,7 @@
#include "qpid/sys/FileSysDir.h"
#include "qpid/sys/StrError.h"
+#include "qpid/log/Statement.h"
#include "qpid/Exception.h"
#include <sys/types.h>
@@ -25,6 +26,8 @@
#include <fcntl.h>
#include <cerrno>
#include <unistd.h>
+#include <dirent.h>
+#include <stdlib.h>
namespace qpid {
namespace sys {
@@ -51,4 +54,27 @@ void FileSysDir::mkdir(void)
throw Exception ("Can't create directory: " + dirPath);
}
+void FileSysDir::forEachFile(Callback cb) const {
+
+ ::dirent** namelist;
+
+ int n = scandir(dirPath.c_str(), &namelist, 0, alphasort);
+ if (n == -1) throw Exception (strError(errno) + ": Can't scan directory: " + dirPath);
+
+ for (int i = 0; i<n; ++i) {
+ std::string fullpath = dirPath + "/" + namelist[i]->d_name;
+ // Filter out non files/stat problems etc.
+ struct ::stat s;
+ // Can't throw here without leaking memory, so just do nothing with
+ // entries for which stat() fails.
+ if (!::stat(fullpath.c_str(), &s)) {
+ if (S_ISREG(s.st_mode)) {
+ cb(fullpath);
+ }
+ }
+ ::free(namelist[i]);
+ }
+ ::free(namelist);
+}
+
}} // namespace qpid::sys
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/SystemInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/SystemInfo.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/SystemInfo.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/posix/SystemInfo.cpp Fri Jan 25 18:20:39 2013
@@ -77,51 +77,6 @@ inline bool isLoopback(const ::sockaddr*
}
}
-void SystemInfo::getLocalIpAddresses (uint16_t port,
- std::vector<Address> &addrList) {
- ::ifaddrs* ifaddr = 0;
- QPID_POSIX_CHECK(::getifaddrs(&ifaddr));
- for (::ifaddrs* ifap = ifaddr; ifap != 0; ifap = ifap->ifa_next) {
- if (ifap->ifa_addr == 0) continue;
- if (isLoopback(ifap->ifa_addr)) continue;
- int family = ifap->ifa_addr->sa_family;
- switch (family) {
- case AF_INET6: {
- // Ignore link local addresses as:
- // * The scope id is illegal in URL syntax
- // * Clients won't be able to use a link local address
- // without adding their own (potentially different) scope id
- sockaddr_in6* sa6 = (sockaddr_in6*)((void*)ifap->ifa_addr);
- if (IN6_IS_ADDR_LINKLOCAL(&sa6->sin6_addr)) break;
- // Fallthrough
- }
- case AF_INET: {
- char dispName[NI_MAXHOST];
- int rc = ::getnameinfo(
- ifap->ifa_addr,
- (family == AF_INET)
- ? sizeof(struct sockaddr_in)
- : sizeof(struct sockaddr_in6),
- dispName, sizeof(dispName),
- 0, 0, NI_NUMERICHOST);
- if (rc != 0) {
- throw QPID_POSIX_ERROR(rc);
- }
- string addr(dispName);
- addrList.push_back(Address(TCP, addr, port));
- break;
- }
- default:
- continue;
- }
- }
- ::freeifaddrs(ifaddr);
-
- if (addrList.empty()) {
- addrList.push_back(Address(TCP, LOOPBACK, port));
- }
-}
-
namespace {
inline socklen_t sa_len(::sockaddr* sa)
{
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/solaris/SystemInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/solaris/SystemInfo.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/solaris/SystemInfo.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/solaris/SystemInfo.cpp Fri Jan 25 18:20:39 2013
@@ -60,31 +60,6 @@ bool SystemInfo::getLocalHostname(Addres
static const string LOCALHOST("127.0.0.1");
static const string TCP("tcp");
-void SystemInfo::getLocalIpAddresses(uint16_t port,
- std::vector<Address> &addrList) {
- int s = socket(PF_INET, SOCK_STREAM, 0);
- for (int i=1;;i++) {
- struct lifreq ifr;
- ifr.lifr_index = i;
- if (::ioctl(s, SIOCGIFADDR, &ifr) < 0) {
- break;
- }
- struct sockaddr *sa = static_cast<struct sockaddr *>((void *) &ifr.lifr_addr);
- if (sa->sa_family != AF_INET) {
- // TODO: Url parsing currently can't cope with IPv6 addresses, defer for now
- break;
- }
- struct sockaddr_in *sin = static_cast<struct sockaddr_in *>((void *)sa);
- std::string addr(inet_ntoa(sin->sin_addr));
- if (addr != LOCALHOST)
- addrList.push_back(Address(TCP, addr, port));
- }
- if (addrList.empty()) {
- addrList.push_back(Address(TCP, LOCALHOST, port));
- }
- close (s);
-}
-
void SystemInfo::getSystemId(std::string &osName,
std::string &nodeName,
std::string &release,
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ssl/util.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ssl/util.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ssl/util.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/ssl/util.cpp Fri Jan 25 18:20:39 2013
@@ -31,8 +31,6 @@
#include <iostream>
#include <fstream>
-#include <boost/filesystem/operations.hpp>
-#include <boost/filesystem/path.hpp>
namespace qpid {
namespace sys {
@@ -82,15 +80,14 @@ SslOptions SslOptions::global;
char* readPasswordFromFile(PK11SlotInfo*, PRBool retry, void*)
{
const std::string& passwordFile = SslOptions::global.certPasswordFile;
- if (retry || passwordFile.empty() || !boost::filesystem::exists(passwordFile)) {
- return 0;
- } else {
- std::ifstream file(passwordFile.c_str());
- std::string password;
- file >> password;
- return PL_strdup(password.c_str());
- }
-}
+ if (retry || passwordFile.empty()) return 0;
+ std::ifstream file(passwordFile.c_str());
+ if (!file) return 0;
+
+ std::string password;
+ file >> password;
+ return PL_strdup(password.c_str());
+}
void initNSS(const SslOptions& options, bool server)
{
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp Fri Jan 25 18:20:39 2013
@@ -24,6 +24,9 @@
#include <sys/stat.h>
#include <direct.h>
#include <errno.h>
+#include <windows.h>
+#include <strsafe.h>
+
namespace qpid {
namespace sys {
@@ -50,4 +53,36 @@ void FileSysDir::mkdir(void)
throw Exception ("Can't create directory: " + dirPath);
}
+void FileSysDir::forEachFile(Callback cb) const {
+
+ WIN32_FIND_DATAA findFileData;
+ char szDir[MAX_PATH];
+ size_t dirPathLength;
+ HANDLE hFind = INVALID_HANDLE_VALUE;
+
+ // create dirPath+"\*" in szDir
+ StringCchLength (dirPath.c_str(), MAX_PATH, &dirPathLength);
+
+ if (dirPathLength > (MAX_PATH - 3)) {
+ throw Exception ("Directory path is too long: " + dirPath);
+ }
+
+ StringCchCopy(szDir, MAX_PATH, dirPath.c_str());
+ StringCchCat(szDir, MAX_PATH, TEXT("\\*"));
+
+ // Special work for first file
+ hFind = FindFirstFileA(szDir, &findFileData);
+ if (INVALID_HANDLE_VALUE == hFind) {
+ return;
+ }
+
+ // process everything that isn't a directory
+ do {
+ if (!(findFileData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY)) {
+ std::string fileName(findFileData.cFileName);
+ cb(fileName);
+ }
+ } while (FindNextFile(hFind, &findFileData) != 0);
+}
+
}} // namespace qpid::sys
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp Fri Jan 25 18:20:39 2013
@@ -67,35 +67,6 @@ bool SystemInfo::getLocalHostname (Addre
static const std::string LOCALHOST("127.0.0.1");
static const std::string TCP("tcp");
-void SystemInfo::getLocalIpAddresses (uint16_t port,
- std::vector<Address> &addrList) {
- enum { MAX_URL_INTERFACES = 100 };
-
- SOCKET s = socket (PF_INET, SOCK_STREAM, 0);
- if (s != INVALID_SOCKET) {
- INTERFACE_INFO interfaces[MAX_URL_INTERFACES];
- DWORD filledBytes = 0;
- WSAIoctl (s,
- SIO_GET_INTERFACE_LIST,
- 0,
- 0,
- interfaces,
- sizeof (interfaces),
- &filledBytes,
- 0,
- 0);
- unsigned int interfaceCount = filledBytes / sizeof (INTERFACE_INFO);
- for (unsigned int i = 0; i < interfaceCount; ++i) {
- if (interfaces[i].iiFlags & IFF_UP) {
- std::string addr(inet_ntoa(interfaces[i].iiAddress.AddressIn.sin_addr));
- if (addr != LOCALHOST)
- addrList.push_back(Address(TCP, addr, port));
- }
- }
- closesocket (s);
- }
-}
-
// Null function which always fails to find an network interface name
bool SystemInfo::getInterfaceAddresses(const std::string&, std::vector<std::string>&)
{
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/types/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/types/Variant.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/types/Variant.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/types/Variant.cpp Fri Jan 25 18:20:39 2013
@@ -110,21 +110,28 @@ class VariantImpl
} value;
std::string encoding;//optional encoding for variable length data
- template<class T> T convertFromString() const
+ template<class T> T convertFromString() const
{
const std::string& s = *value.string;
+
try {
- T r = boost::lexical_cast<T>(s);
- //lexical_cast won't fail if string is a negative number and T is unsigned
- //So check that and allow special case of negative zero
- //else its a non-zero negative number so throw exception at end of function
- if (std::numeric_limits<T>::is_signed || s.find('-') != 0 || r == 0) {
- return r;
+ // Extra shenanigans to work around negative zero
+ // conversion error in older GCC libs.
+ if ( s[0] != '-' ) {
+ return boost::lexical_cast<T>(s);
+ } else {
+ T r = boost::lexical_cast<T>(s.substr(1));
+ if (std::numeric_limits<T>::is_signed) {
+ return -r;
+ } else {
+ if (r==0) return 0;
+ }
}
} catch(const boost::bad_lexical_cast&) {
}
throw InvalidConversion(QPID_MSG("Cannot convert " << s));
}
+
};
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpidd.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpidd.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpidd.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpidd.cpp Fri Jan 25 18:20:39 2013
@@ -75,17 +75,19 @@ int run_broker(int argc, char *argv[], b
for (vector<string>::iterator iter = bootOptions.module.load.begin();
iter != bootOptions.module.load.end();
iter++)
- qpid::tryShlib (iter->data(), false);
+ qpid::tryShlib (*iter);
if (!bootOptions.module.noLoad) {
bool isDefault = defaultPath == bootOptions.module.loadDir;
qpid::loadModuleDir (bootOptions.module.loadDir, isDefault);
}
- // Parse options
+ // Parse options. In the second pass, do not allow unknown options.
+ // All the modules have been added now, so any unknown options
+ // should be flagged as errors.
try {
options.reset(new QpiddOptions(argv[0]));
- options->parse(argc, argv, options->common.config);
+ options->parse(argc, argv, options->common.config, false);
} catch (const std::exception& /*e*/) {
if (helpArgSeen) {
// provide help even when parsing fails
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.mk
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.mk?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.mk (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.mk Fri Jan 25 18:20:39 2013
@@ -50,7 +50,8 @@ sslconnector_la_SOURCES = \
if HAVE_PROTON
sslconnector_la_SOURCES += \
- qpid/messaging/amqp/SslTransport.cpp
+ qpid/messaging/amqp/SslTransport.cpp \
+ qpid/messaging/amqp/SslTransport.h
endif #HAVE_PROTON
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/tests:r1422061-1438053
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/.valgrind.supp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/.valgrind.supp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/.valgrind.supp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/.valgrind.supp Fri Jan 25 18:20:39 2013
@@ -39,21 +39,6 @@
fun:_sasl_load_plugins
fun:sasl_client_init
}
-{
- Benign leak in CPG - patched version.
- Memcheck:Leak
- fun:*
- fun:openais_service_connect
- fun:cpg_initialize
-}
-
-{
- Benign error in libcpg.
- Memcheck:Param
- socketcall.sendmsg(msg.msg_iov[i])
- obj:*/libpthread-2.5.so
- obj:*/libcpg.so.2.0.0
-}
{
Uninitialised value problem in _dl_relocate (F7, F8)
@@ -161,14 +146,6 @@
}
{
- CPG error - seems benign.
- Memcheck:Param
- socketcall.sendmsg(msg.msg_iov[i])
- obj:*
- obj:*/libcpg.so.2.0.0
-}
-
-{
Known leak in boost.thread 1.33.1. Wildcards for 64/32 bit diffs.
Memcheck:Leak
fun:*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org