You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2013/02/18 11:18:29 UTC
svn commit: r1447183 [3/7] - in /qpid/branches/java-broker-config-qpid-4390:
./ qpid/ qpid/bin/ qpid/cpp/bindings/ qpid/cpp/bindings/qpid/dotnet/
qpid/cpp/bindings/qpid/examples/perl/ qpid/cpp/bindings/qpid/perl/
qpid/cpp/bindings/qpid/perl/lib/ qpid/c...
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclData.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclData.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclData.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclData.h Mon Feb 18 10:18:24 2013
@@ -111,6 +111,8 @@ public:
typedef std::map<std::string, ruleSet > actionObject; // user
typedef actionObject::iterator actObjItr;
typedef actionObject* aclAction;
+ typedef std::map<std::string, uint16_t> quotaRuleSet; // <username, N>
+ typedef quotaRuleSet::const_iterator quotaRuleSetItr;
// Action*[] -> Object*[] -> map<user -> set<Rule> >
aclAction* actionList[qpid::acl::ACTIONSIZE];
@@ -134,9 +136,18 @@ public:
bool matchProp(const std::string & src, const std::string& src1);
void clear ();
- static const std::string USER_SUBSTITUTION_KEYWORD;
- static const std::string DOMAIN_SUBSTITUTION_KEYWORD;
- static const std::string USERDOMAIN_SUBSTITUTION_KEYWORD;
+ static const std::string ACL_KEYWORD_USER_SUBST;
+ static const std::string ACL_KEYWORD_DOMAIN_SUBST;
+ static const std::string ACL_KEYWORD_USERDOMAIN_SUBST;
+ static const std::string ACL_KEYWORD_ALL;
+ static const std::string ACL_KEYWORD_ACL;
+ static const std::string ACL_KEYWORD_GROUP;
+ static const std::string ACL_KEYWORD_QUOTA;
+ static const std::string ACL_KEYWORD_QUOTA_CONNECTIONS;
+ static const char ACL_SYMBOL_WILDCARD;
+ static const std::string ACL_KEYWORD_WILDCARD;
+ static const char ACL_SYMBOL_LINE_CONTINUATION;
+
void substituteString(std::string& targetString,
const std::string& placeholder,
const std::string& replacement);
@@ -146,6 +157,31 @@ public:
void substituteKeywords(std::string& ruleString,
const std::string& userId);
+ // Per user connection quotas extracted from acl rule file
+ // Set by reader
+ void setConnQuotaRuleSettings (bool, boost::shared_ptr<quotaRuleSet>);
+ // Get by connection approvers
+ bool enforcingConnectionQuotas() { return connQuotaRulesExist; }
+ bool getConnQuotaForUser(const std::string&, uint16_t*) const;
+
+ /** getConnectMaxSpec
+ * Connection quotas are held in uint16_t variables.
+ * This function specifies the largest value that a user is allowed
+ * to declare for a connection quota. The upper limit serves two
+ * purposes: 1. It leaves room for magic numbers that may be declared
+ * by keyword names in Acl files and not have those numbers conflict
+ * with innocent user declared values, and 2. It makes the unsigned
+ * math very close to _MAX work reliably with no risk of accidental
+ * wrapping back to zero.
+ */
+ static uint16_t getConnectMaxSpec() {
+ return 65530;
+ }
+ static std::string getMaxConnectSpecStr() {
+ return "65530";
+ }
+
+
AclData();
virtual ~AclData();
@@ -157,6 +193,10 @@ private:
bool compareIntMin(const qpid::acl::SpecProperty theProperty,
const std::string theAclValue,
const std::string theLookupValue);
+
+ // Per-user connection quota
+ bool connQuotaRulesExist;
+ boost::shared_ptr<quotaRuleSet> connQuotaRuleSettings; // Map of user-to-N values from rule file
};
}} // namespace qpid::acl
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclPlugin.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclPlugin.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclPlugin.cpp Mon Feb 18 10:18:24 2013
@@ -42,8 +42,8 @@ struct AclOptions : public Options {
values.aclMaxConnectTotal = 500;
addOptions()
("acl-file", optValue(values.aclFile, "FILE"), "The policy file to load from, loaded from data dir")
- ("max-connections" , optValue(values.aclMaxConnectTotal, "N"), "The maximum combined number of connections allowed. 0 implies no limit.")
("connection-limit-per-user", optValue(values.aclMaxConnectPerUser, "N"), "The maximum number of connections allowed per user. 0 implies no limit.")
+ ("max-connections" , optValue(values.aclMaxConnectTotal, "N"), "The maximum combined number of connections allowed. 0 implies no limit.")
("connection-limit-per-ip" , optValue(values.aclMaxConnectPerIp, "N"), "The maximum number of connections allowed per host IP address. 0 implies no limit.")
("max-queues-per-user", optValue(values.aclMaxQueuesPerUser, "N"), "The maximum number of queues allowed per user. 0 implies no limit.")
;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclReader.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclReader.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclReader.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclReader.cpp Mon Feb 18 10:18:24 2013
@@ -24,6 +24,7 @@
#include <sstream>
#include "qpid/log/Statement.h"
#include "qpid/Exception.h"
+#include <boost/lexical_cast.hpp>
#include <iomanip> // degug
#include <iostream> // debug
@@ -95,7 +96,7 @@ namespace acl {
<< cnt << " " << (*i)->toString());
if (!foundmode && (*i)->actionAll && (*i)->names.size() == 1
- && (*((*i)->names.begin())).compare("*") == 0) {
+ && (*((*i)->names.begin())).compare(AclData::ACL_KEYWORD_WILDCARD) == 0) {
d->decisionMode = (*i)->res;
QPID_LOG(debug, "ACL: FoundMode "
<< AclHelper::getAclResultStr(d->decisionMode));
@@ -105,9 +106,9 @@ namespace acl {
// Record which properties have the user substitution string
for (pmCitr pItr=rule.props.begin(); pItr!=rule.props.end(); pItr++) {
- if ((pItr->second.find(AclData::USER_SUBSTITUTION_KEYWORD, 0) != std::string::npos) ||
- (pItr->second.find(AclData::DOMAIN_SUBSTITUTION_KEYWORD, 0) != std::string::npos) ||
- (pItr->second.find(AclData::USERDOMAIN_SUBSTITUTION_KEYWORD, 0) != std::string::npos)) {
+ if ((pItr->second.find(AclData::ACL_KEYWORD_USER_SUBST, 0) != std::string::npos) ||
+ (pItr->second.find(AclData::ACL_KEYWORD_DOMAIN_SUBST, 0) != std::string::npos) ||
+ (pItr->second.find(AclData::ACL_KEYWORD_USERDOMAIN_SUBST, 0) != std::string::npos)) {
rule.ruleHasUserSub[pItr->first] = true;
}
}
@@ -167,7 +168,7 @@ namespace acl {
// add users and Rule to object set
bool allNames = false;
// check to see if names.begin is '*'
- if ((*(*i)->names.begin()).compare("*") == 0)
+ if ((*(*i)->names.begin()).compare(AclData::ACL_KEYWORD_WILDCARD) == 0)
allNames = true;
for (nsCitr itr = (allNames ? names.begin() : (*i)->names.begin());
@@ -199,7 +200,7 @@ namespace acl {
objstr << AclHelper::getObjectTypeStr((ObjectType) ocnt) << ",";
}
- bool allNames = ((*(*i)->names.begin()).compare("*") == 0);
+ bool allNames = ((*(*i)->names.begin()).compare(AclData::ACL_KEYWORD_WILDCARD) == 0);
std::ostringstream userstr;
for (nsCitr itr = (allNames ? names.begin() : (*i)->names.begin());
itr != (allNames ? names.end() : (*i)->names.end());
@@ -218,12 +219,15 @@ namespace acl {
<< "}" );
}
}
+
+ // connection quota
+ d->setConnQuotaRuleSettings(connQuotaRulesExist, connQuota);
}
void AclReader::aclRule::processName(const std::string& name, const groupMap& groups) {
- if (name.compare("all") == 0) {
- names.insert("*");
+ if (name.compare(AclData::ACL_KEYWORD_ALL) == 0) {
+ names.insert(AclData::ACL_KEYWORD_WILDCARD);
} else {
gmCitr itr = groups.find(name);
if (itr == groups.end()) {
@@ -234,9 +238,13 @@ namespace acl {
}
}
- AclReader::AclReader() : lineNumber(0), contFlag(false), validationMap(new AclHelper::objectMap) {
+ AclReader::AclReader(uint16_t theCliMaxConnPerUser) : lineNumber(0), contFlag(false),
+ validationMap(new AclHelper::objectMap),
+ cliMaxConnPerUser (theCliMaxConnPerUser),
+ connQuotaRulesExist(false),
+ connQuota(new AclData::quotaRuleSet) {
AclHelper::loadValidationMap(validationMap);
- names.insert("*");
+ names.insert(AclData::ACL_KEYWORD_WILDCARD);
}
AclReader::~AclReader() {}
@@ -254,6 +262,11 @@ namespace acl {
errorStream << "Unable to open ACL file \"" << fn << "\": eof=" << (ifs.eof()?"T":"F") << "; fail=" << (ifs.fail()?"T":"F") << "; bad=" << (ifs.bad()?"T":"F");
return -1;
}
+ // Propagate nonzero per-user max connection setting from CLI
+ if (cliMaxConnPerUser > 0) {
+ (*connQuota)[AclData::ACL_KEYWORD_ACL] = cliMaxConnPerUser;
+ }
+ // Loop to process the Acl file
try {
bool err = false;
while (ifs.good()) {
@@ -282,6 +295,7 @@ namespace acl {
}
printNames();
printRules();
+ printConnectionQuotas();
loadDecisionData(d);
return 0;
@@ -292,7 +306,7 @@ namespace acl {
std::vector<std::string> toks;
// Check for continuation
- char* contCharPtr = std::strrchr(line, '\\');
+ char* contCharPtr = std::strrchr(line, AclData::ACL_SYMBOL_LINE_CONTINUATION);
bool cont = contCharPtr != 0;
if (cont) *contCharPtr = 0;
@@ -303,10 +317,12 @@ namespace acl {
return false;
}
- if (numToks && (toks[0].compare("group") == 0 || contFlag)) {
+ if (numToks && (toks[0].compare(AclData::ACL_KEYWORD_GROUP) == 0 || contFlag)) {
ret = processGroupLine(toks, cont);
- } else if (numToks && toks[0].compare("acl") == 0) {
+ } else if (numToks && toks[0].compare(AclData::ACL_KEYWORD_ACL) == 0) {
ret = processAclLine(toks);
+ } else if (numToks && toks[0].compare(AclData::ACL_KEYWORD_QUOTA) == 0) {
+ ret = processQuotaLine(toks);
} else {
// Check for whitespace only line, ignore these
bool ws = true;
@@ -317,7 +333,10 @@ namespace acl {
ret = true;
} else {
errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
- << ", Non-continuation line must start with \"group\" or \"acl\".";
+ << ", Non-continuation line must start with \""
+ << AclData::ACL_KEYWORD_GROUP << "\", \""
+ << AclData::ACL_KEYWORD_ACL << "\". or \""
+ << AclData::ACL_KEYWORD_QUOTA << "\".";
ret = false;
}
}
@@ -337,6 +356,102 @@ namespace acl {
return cnt;
}
+
+ // Process 'quota' rule lines
+ // Return true if the line is successfully processed without errors
+ bool AclReader::processQuotaLine(tokList& toks) {
+ const unsigned toksSize = toks.size();
+ const unsigned minimumSize = 3;
+ if (toksSize < minimumSize) {
+ errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
+ << ", Insufficient tokens for quota definition.";
+ return false;
+ }
+
+ if (toks[1].compare(AclData::ACL_KEYWORD_QUOTA_CONNECTIONS) == 0) {
+ return processQuotaConnLine(toks);
+ } else {
+ errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
+ << ", Quota type \"" << toks[1] << "\" unrecognized.";
+ return false;
+ }
+ }
+
+
+ // Process 'quota connections' rule lines
+ // Return true if the line is successfully processed without errors
+ bool AclReader::processQuotaConnLine(tokList& toks) {
+ const unsigned toksSize = toks.size();
+
+ uint16_t nConns(0);
+ try {
+ nConns = boost::lexical_cast<uint16_t>(toks[2]);
+ } catch(const boost::bad_lexical_cast&) {
+ errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
+ << ", Connection quota value \"" << toks[2]
+ << "\" cannot be converted to a 16-bit unsigned integer.";
+ return false;
+ }
+
+ // limit check the connection setting
+ if (nConns > AclData::getConnectMaxSpec())
+ {
+ errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
+ << ", Connection quota value \"" << toks[2]
+ << "\" exceeds maximum configuration setting of "
+ << AclData::getConnectMaxSpec();
+ return false;
+ }
+
+ // Apply the connection count to all names in rule
+ for (unsigned idx = 3; idx < toksSize; idx++) {
+ if (groups.find(toks[idx]) == groups.end()) {
+ // This is the name of an individual, not a group
+ (*connQuota)[toks[idx]] = nConns;
+ } else {
+ if (!processQuotaConnGroup(toks[idx], nConns))
+ return false;
+ }
+ }
+ return true;
+ }
+
+
+ // Process 'quota connections' group expansion
+ // Return true if the quota is applied to all members of the group
+ bool AclReader::processQuotaConnGroup(const std::string& theGroup, uint16_t theQuota) {
+ gmCitr citr = groups.find(theGroup);
+
+ if (citr == groups.end()) {
+ errorStream << ACL_FORMAT_ERR_LOG_PREFIX << "Line : " << lineNumber
+ << ", Failed to expand group \"" << theGroup << "\".";
+ return false;
+ }
+
+ for (nsCitr gni=citr->second->begin(); gni!=citr->second->end(); gni++) {
+ if (groups.find(*gni) == groups.end()) {
+ (*connQuota)[*gni] = theQuota;
+ } else {
+ if (!processQuotaConnGroup(*gni, theQuota))
+ return false;
+ }
+ }
+ return true;
+ }
+
+
+ void AclReader::printConnectionQuotas() const {
+ QPID_LOG(debug, "ACL: connection quota: " << (*connQuota).size() << " rules found:");
+ int cnt = 1;
+ for (AclData::quotaRuleSetItr itr=(*connQuota).begin();
+ itr != (*connQuota).end();
+ ++itr,++cnt) {
+ QPID_LOG(debug, "ACL: quota " << cnt << " : " << (*itr).second
+ << " connections for " << (*itr).first)
+ }
+ }
+
+
// Return true if the line is successfully processed without errors
// If cont is true, then groupName must be set to the continuation group name
bool AclReader::processGroupLine(tokList& toks, const bool cont) {
@@ -462,8 +577,8 @@ namespace acl {
return false;
}
- bool actionAllFlag = toks[3].compare("all") == 0;
- bool userAllFlag = toks[2].compare("all") == 0;
+ bool actionAllFlag = toks[3].compare(AclData::ACL_KEYWORD_ALL) == 0;
+ bool userAllFlag = toks[2].compare(AclData::ACL_KEYWORD_ALL) == 0;
Action action;
if (actionAllFlag) {
@@ -492,7 +607,7 @@ namespace acl {
}
if (toksSize >= 5) { // object name-value pair
- if (toks[4].compare("all") == 0) {
+ if (toks[4].compare(AclData::ACL_KEYWORD_ALL) == 0) {
rule->setObjectTypeAll();
} else {
try {
@@ -526,7 +641,7 @@ namespace acl {
}
}
// Check if name (toks[2]) is group; if not, add as name of individual
- if (toks[2].compare("all") != 0) {
+ if (toks[2].compare(AclData::ACL_KEYWORD_ALL) != 0) {
if (groups.find(toks[2]) == groups.end()) {
addName(toks[2]);
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclReader.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclReader.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclReader.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclReader.h Mon Feb 18 10:18:24 2013
@@ -28,6 +28,7 @@
#include <sstream>
#include <memory>
#include "qpid/acl/AclData.h"
+#include "qpid/acl/Acl.h"
#include "qpid/broker/AclModule.h"
namespace qpid {
@@ -96,7 +97,7 @@ class AclReader {
std::ostringstream errorStream;
public:
- AclReader();
+ AclReader(uint16_t cliMaxConnPerUser);
virtual ~AclReader();
int read(const std::string& fn, boost::shared_ptr<AclData> d); // return=0 for success
std::string getError();
@@ -116,8 +117,17 @@ class AclReader {
void printRules() const; // debug aid
bool isValidUserName(const std::string& name);
+ bool processQuotaLine(tokList& toks);
+ bool processQuotaConnLine(tokList& toks);
+ bool processQuotaConnGroup(const std::string&, uint16_t);
+ void printConnectionQuotas() const;
+
static bool isValidGroupName(const std::string& name);
static nvPair splitNameValuePair(const std::string& nvpString);
+
+ const uint16_t cliMaxConnPerUser;
+ bool connQuotaRulesExist;
+ boost::shared_ptr<AclData::quotaRuleSet> connQuota;
};
}} // namespace qpid::acl
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclTopicMatch.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclTopicMatch.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclTopicMatch.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/AclTopicMatch.h Mon Feb 18 10:18:24 2013
@@ -30,7 +30,7 @@ namespace qpid {
namespace broker {
// Class for executing topic exchange routing key matching rules in
-// Acl code the allows or denies users publishing to an exchange.
+// Acl code. Allows or denies users publishing to an exchange.
class TopicExchange::TopicExchangeTester {
class boundNode;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Mon Feb 18 10:18:24 2013
@@ -119,7 +119,6 @@ size_t Connection::encode(char* buffer,
void Connection::abort() { output.abort(); }
void Connection::activateOutput() { output.activateOutput(); }
-void Connection::giveReadCredit(int32_t credit) { output.giveReadCredit(credit); }
void Connection::close() {
// No more frames can be pushed onto the queue.
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp_0_10/Connection.h Mon Feb 18 10:18:24 2013
@@ -66,7 +66,6 @@ class Connection : public sys::Connecti
bool canEncode();
void abort();
void activateOutput();
- void giveReadCredit(int32_t);
void closed(); // connection closed by peer.
void close(); // closing from this end.
void send(framing::AMQFrame&);
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/broker:r1438054-1446845
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp Mon Feb 18 10:18:24 2013
@@ -48,6 +48,8 @@
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogHiresTimestamp.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogHiresTimestamp.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
@@ -558,7 +560,22 @@ Manageable::status_t Broker::ManagementM
status = setTimestampConfig(a.i_receive, getManagementExecutionContext());
break;
}
- default:
+
+ case _qmf::Broker::METHOD_GETLOGHIRESTIMESTAMP:
+ {
+ dynamic_cast<_qmf::ArgsBrokerGetLogHiresTimestamp&>(args).o_logHires = getLogHiresTimestamp();
+ QPID_LOG (debug, "Broker::getLogHiresTimestamp()");
+ status = Manageable::STATUS_OK;
+ break;
+ }
+ case _qmf::Broker::METHOD_SETLOGHIRESTIMESTAMP:
+ {
+ setLogHiresTimestamp(dynamic_cast<_qmf::ArgsBrokerSetLogHiresTimestamp&>(args).i_logHires);
+ QPID_LOG (debug, "Broker::setLogHiresTimestamp()");
+ status = Manageable::STATUS_OK;
+ break;
+ }
+ default:
QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
@@ -982,6 +999,18 @@ std::string Broker::getLogLevel()
return level;
}
+void Broker::setLogHiresTimestamp(bool enabled)
+{
+ QPID_LOG(notice, "Changing log hires timestamp to " << enabled);
+ qpid::log::Logger::instance().setHiresTimestamp(enabled);
+}
+
+bool Broker::getLogHiresTimestamp()
+{
+ return qpid::log::Logger::instance().getHiresTimestamp();
+}
+
+
boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const {
ProtocolFactoryMap::const_iterator i
= name.empty() ? protocolFactories.begin() : protocolFactories.find(name);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h Mon Feb 18 10:18:24 2013
@@ -130,6 +130,8 @@ class Broker : public sys::Runnable, pub
void setStore ();
void setLogLevel(const std::string& level);
std::string getLogLevel();
+ void setLogHiresTimestamp(bool enabled);
+ bool getLogHiresTimestamp();
void createObject(const std::string& type, const std::string& name,
const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context);
void deleteObject(const std::string& type, const std::string& name,
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.cpp Mon Feb 18 10:18:24 2013
@@ -25,6 +25,7 @@
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/sys/SecuritySettings.h"
#include "qpid/sys/Timer.h"
@@ -481,7 +482,6 @@ void Connection::OutboundFrameTracker::c
size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered(); }
void Connection::OutboundFrameTracker::abort() { next->abort(); }
void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
-void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
void Connection::OutboundFrameTracker::send(framing::AMQFrame& f)
{
next->send(f);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.h Mon Feb 18 10:18:24 2013
@@ -30,24 +30,13 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/ConnectionHandler.h"
#include "qpid/broker/ConnectionState.h"
-#include "qpid/broker/SessionHandler.h"
-#include "qmf/org/apache/qpid/broker/Connection.h"
-#include "qpid/Exception.h"
-#include "qpid/RefCounted.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/management/ManagementAgent.h"
-#include "qpid/management/Manageable.h"
-#include "qpid/ptr_map.h"
-#include "qpid/sys/AggregateOutput.h"
#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/sys/SecuritySettings.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/sys/TimeoutHandler.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/RefCounted.h"
+#include "qpid/ptr_map.h"
+
+#include "qmf/org/apache/qpid/broker/Connection.h"
#include <boost/ptr_container/ptr_map.hpp>
#include <boost/bind.hpp>
@@ -56,13 +45,16 @@
namespace qpid {
namespace sys {
+class Timer;
class TimerTask;
}
namespace broker {
class Broker;
class LinkRegistry;
+class Queue;
class SecureConnection;
+class SessionHandler;
struct ConnectionTimeoutTask;
class Connection : public sys::ConnectionInputHandler,
@@ -199,7 +191,6 @@ class Connection : public sys::Connectio
size_t getBuffered() const;
void abort();
void activateOutput();
- void giveReadCredit(int32_t credit);
void send(framing::AMQFrame&);
void wrap(sys::ConnectionOutputHandlerPtr&);
private:
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Mon Feb 18 10:18:24 2013
@@ -32,6 +32,7 @@
#include "qpid/framing/enum.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/sys/SecurityLayer.h"
#include "qpid/broker/AclModule.h"
#include "qpid/amqp_0_10/Codecs.h"
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Consumer.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Consumer.h Mon Feb 18 10:18:24 2013
@@ -86,6 +86,8 @@ class Consumer : public QueueCursor {
*/
virtual bool isCounted() { return true; }
+ QueueCursor getCursor() const { return *this; }
+ void setCursor(const QueueCursor& qc) { static_cast<QueueCursor&>(*this) = qc; }
protected:
//framing::SequenceNumber position;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp Mon Feb 18 10:18:24 2013
@@ -366,7 +366,8 @@ bool Queue::getNextMessage(Message& m, C
while (true) {
//TODO: reduce lock scope
Mutex::ScopedLock locker(messageLock);
- Message* msg = messages->next(*c);
+ QueueCursor cursor = c->getCursor(); // Save current position.
+ Message* msg = messages->next(*c); // Advances c.
if (msg) {
if (msg->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
@@ -405,6 +406,7 @@ bool Queue::getNextMessage(Message& m, C
} else {
//message(s) are available but consumer hasn't got enough credit
QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ c->setCursor(cursor); // Restore cursor, will try again with credit
if (c->preAcquires()) {
//let someone else try
listeners.populate(set);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueSettings.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueSettings.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueSettings.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueSettings.cpp Mon Feb 18 10:18:24 2013
@@ -77,6 +77,7 @@ const QueueSettings::Aliases QueueSettin
QueueSettings::QueueSettings(bool d, bool a) :
durable(d),
autodelete(a),
+ isTemporary(false),
priorities(0),
defaultFairshare(0),
shareGroups(false),
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueSettings.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueSettings.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueSettings.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueSettings.h Mon Feb 18 10:18:24 2013
@@ -43,6 +43,7 @@ struct QueueSettings
bool durable;
bool autodelete;
+ bool isTemporary;
//basic queue types:
std::string lvqKey;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Feb 18 10:18:24 2013
@@ -37,6 +37,7 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/IsInSequenceSet.h"
#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
#include "qpid/broker/FedOps.h"
@@ -389,7 +390,7 @@ bool SemanticStateConsumerImpl::accept(c
// remain on queue's listener list for possible smaller messages
// in future.
//
- blocked = !(filter(msg) && checkCredit(msg));
+ blocked = !checkCredit(msg);
return !blocked;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Feb 18 10:18:24 2013
@@ -294,6 +294,8 @@ void SessionAdapter::QueueHandlerImpl::d
} catch (const qpid::types::Exception& e) {
throw InvalidArgumentException(e.what());
}
+ // Identify queues that won't survive a failover.
+ settings.isTemporary = exclusive && autoDelete && !settings.autoDeleteDelay;
std::pair<Queue::shared_ptr, bool> queue_created =
getBroker().createQueue(name, settings,
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Feb 18 10:18:24 2013
@@ -143,11 +143,6 @@ void SessionState::activateOutput() {
getConnection().outputTasks.activateOutput();
}
-void SessionState::giveReadCredit(int32_t credit) {
- if (isAttached())
- getConnection().outputTasks.giveReadCredit(credit);
-}
-
ManagementObject::shared_ptr SessionState::GetManagementObject(void) const
{
return mgmtObject;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h Mon Feb 18 10:18:24 2013
@@ -99,7 +99,6 @@ class SessionState : public qpid::Sessio
/** OutputControl **/
void abort();
void activateOutput();
- void giveReadCredit(int32_t);
void senderCompleted(const framing::SequenceSet& ranges);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Mon Feb 18 10:18:24 2013
@@ -29,6 +29,7 @@
#include "qpid/framing/TypeFilter.h"
#include "qpid/framing/SendContent.h"
#include "qpid/log/Statement.h"
+#include "boost/lexical_cast.hpp"
using namespace qpid::framing;
@@ -51,7 +52,12 @@ std::string MessageTransfer::getAnnotati
{
const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
if (mp && mp->hasApplicationHeaders()) {
- return mp->getApplicationHeaders().getAsString(key);
+ FieldTable::ValuePtr value = mp->getApplicationHeaders().get(key);
+ if (value) {
+ if (value->convertsTo<std::string>()) return value->get<std::string>();
+ else if (value->convertsTo<int>()) return boost::lexical_cast<std::string>(value->get<int>());
+ }
+ return std::string();
} else {
return std::string();
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/FailoverManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/FailoverManager.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/FailoverManager.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/FailoverManager.cpp Mon Feb 18 10:18:24 2013
@@ -34,6 +34,9 @@ using qpid::sys::Duration;
FailoverManager::FailoverManager(const ConnectionSettings& s,
ReconnectionStrategy* rs) : settings(s), strategy(rs), state(IDLE) {}
+FailoverManager::~FailoverManager()
+{}
+
void FailoverManager::execute(Command& c)
{
bool retry = false;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/LoadPlugins.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/LoadPlugins.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/LoadPlugins.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/LoadPlugins.h Mon Feb 18 10:18:24 2013
@@ -22,10 +22,12 @@
#ifndef _LoadPlugins_
#define _LoadPlugins_
+#include "qpid/client/ClientImportExport.h"
+
namespace qpid {
namespace client {
-void theModuleLoader();
+QPID_CLIENT_EXTERN void theModuleLoader();
}}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/QueueOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/QueueOptions.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/QueueOptions.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/QueueOptions.cpp Mon Feb 18 10:18:24 2013
@@ -49,8 +49,8 @@ QueueOptions::~QueueOptions()
void QueueOptions::setSizePolicy(QueueSizePolicy sp, uint64_t maxSize, uint32_t maxCount)
{
- if (maxCount) setInt(strMaxCountKey, maxCount);
- if (maxSize) setInt(strMaxSizeKey, maxSize);
+ if (maxCount) setUInt64(strMaxCountKey, maxCount);
+ if (maxSize) setUInt64(strMaxSizeKey, maxSize);
if (maxSize || maxCount){
switch (sp)
{
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/Handler.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/Handler.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/Handler.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/Handler.h Mon Feb 18 10:18:24 2013
@@ -49,29 +49,12 @@ struct Handler {
* Functor<F>(f) will copy f.
* Functor<F&>(f) will only take a reference to x.
*/
- template <class F> class Functor : public Handler<T> {
- public:
- Functor(F f, Handler<T>* next=0) : Handler<T>(next), functor(f) {}
- void handle(T t) { functor(t); }
- private:
- F functor;
- };
+ template <class F> class Functor;
/** Adapt a member function of X as a Handler.
* Only holds a reference to its target, not a copy.
*/
- template <class X, void (X::*F)(T)>
- class MemFunRef : public Handler<T> {
- public:
- MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(&x) {}
- void handle(T t) { (target->*F)(t); }
-
- /** Allow calling with -> syntax */
- MemFunRef* operator->() { return this; }
-
- private:
- X* target;
- };
+ template <class X, void (X::*F)(T)> class MemFunRef;
/** Interface for a handler that implements a
* pair of in/out handle operations.
@@ -94,7 +77,29 @@ struct Handler {
};
};
+template <class T>
+template <class F>
+class Handler<T>::Functor : public Handler<T> {
+ public:
+ Functor(F f, Handler<T>* next=0) : Handler<T>(next), functor(f) {}
+ void handle(T t) { functor(t); }
+ private:
+ F functor;
+};
+
+template <class T>
+template <class X, void (X::*F)(T)>
+class Handler<T>::MemFunRef : public Handler<T> {
+ public:
+ MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(&x) {}
+ void handle(T t) { (target->*F)(t); }
+ /** Allow calling with -> syntax */
+ MemFunRef* operator->() { return this; }
+
+ private:
+ X* target;
+};
}}
#endif /*!QPID_FRAMING_HANDLER_H*/
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Backup.cpp Mon Feb 18 10:18:24 2013
@@ -89,10 +89,10 @@ void Backup::setBrokerUrl(const Url& bro
void Backup::stop(Mutex::ScopedLock&) {
if (stopped) return;
+ stopped = true;
QPID_LOG(debug, logPrefix << "Leaving backup role.");
if (link) link->close();
if (replicator.get()) {
- broker.getExchanges().destroy(replicator->getName());
replicator->shutdown();
replicator.reset();
}
@@ -132,6 +132,7 @@ Role* Backup::promote() {
default:
assert(0); // Not a valid state for the Backup role..
}
+ return 0; // Keep compiler happy
}
Backup::~Backup() {
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Mon Feb 18 10:18:24 2013
@@ -242,12 +242,14 @@ class BrokerReplicator::UpdateTracker {
/** Add an exchange name */
void addExchange(Exchange::shared_ptr ex) {
- if (repTest.isReplicated(CONFIGURATION, *ex)) initial.insert(ex->getName());
+ if (repTest.getLevel(*ex))
+ initial.insert(ex->getName());
}
/** Add a queue name. */
void addQueue(Queue::shared_ptr q) {
- if (repTest.isReplicated(CONFIGURATION, *q)) initial.insert(q->getName());
+ if (repTest.getLevel(*q))
+ initial.insert(q->getName());
}
/** Received an event for name */
@@ -279,7 +281,7 @@ class BrokerReplicator::UpdateTracker {
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
- logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
+ logPrefix("Backup: "), replicationTest(NONE),
haBroker(hb), broker(hb.getBroker()),
exchanges(broker.getExchanges()), queues(broker.getQueues()),
link(l),
@@ -287,8 +289,8 @@ BrokerReplicator::BrokerReplicator(HaBro
alternates(hb.getBroker().getExchanges()),
connection(0)
{
- broker.getConnectionObservers().add(
- boost::shared_ptr<broker::ConnectionObserver>(new ConnectionObserver(*this)));
+ connectionObserver.reset(new ConnectionObserver(*this));
+ broker.getConnectionObservers().add(connectionObserver);
framing::FieldTable args = getArgs();
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
@@ -320,10 +322,11 @@ void BrokerReplicator::initialize() {
"", // excludes
false, // dynamic
0, // sync?
- // shared_ptr keeps this in memory until outstanding initializeBridge
+ // shared_ptr keeps this in memory until outstanding connected
// calls are run.
- boost::bind(&BrokerReplicator::initializeBridge, shared_from_this(), _1, _2)
+ boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2)
);
+ assert(result.second);
result.first->setErrorListener(
boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this)));
}
@@ -339,10 +342,21 @@ void collectQueueReplicators(
}
} // namespace
-void BrokerReplicator::shutdown() {}
+void BrokerReplicator::shutdown() {
+ // NOTE: this is called in a QMF dispatch thread, not the Link's connection
+ // thread. It's OK to be unlocked because it doesn't use any mutable state,
+ // it only calls thread safe functions objects belonging to the Broker.
+
+ // Unregister with broker objects:
+ if (connectionObserver) {
+ broker.getConnectionObservers().remove(connectionObserver);
+ connectionObserver.reset();
+ }
+ broker.getExchanges().destroy(getName());
+}
// This is called in the connection IO thread when the bridge is started.
-void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) {
// Use the credentials of the outgoing Link connection for creating queues,
// exchanges etc. We know link->getConnection() is non-zero because we are
// being called in the connections thread context.
@@ -460,9 +474,7 @@ void BrokerReplicator::route(Deliverable
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
Variant::Map argsMap = asMapVoid(values[ARGS]);
- bool autoDel = values[AUTODEL].asBool();
- bool excl = values[EXCL].asBool();
- if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) {
+ if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
string name = values[QNAME].asString();
QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool());
QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
@@ -476,7 +488,7 @@ void BrokerReplicator::doEventQueueDecla
<< name);
deleteQueue(name);
}
- replicateQueue(name, values[DURABLE].asBool(), autoDel, args,
+ replicateQueue(name, values[DURABLE].asBool(), values[AUTODEL].asBool(), args,
values[ALTEX].asString());
}
}
@@ -494,7 +506,7 @@ void BrokerReplicator::doEventQueueDelet
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = queues.find(name);
- if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
+ if (queue && replicationTest.getLevel(*queue)) {
QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
if (queueTracker.get()) queueTracker->event(name);
deleteQueue(name);
@@ -503,8 +515,7 @@ void BrokerReplicator::doEventQueueDelet
void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGS]));
- if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange.
- if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
+ if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
string name = values[EXNAME].asString();
QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
if (exchangeTracker.get()) exchangeTracker->event(name);
@@ -530,7 +541,7 @@ void BrokerReplicator::doEventExchangeDe
boost::shared_ptr<Exchange> exchange = exchanges.find(name);
if (!exchange) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
- } else if (!replicationTest.replicateLevel(exchange->getArgs())) {
+ } else if (!replicationTest.getLevel(*exchange)) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
} else {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
@@ -547,11 +558,12 @@ void BrokerReplicator::doEventBind(Varia
queues.find(values[QNAME].asString());
framing::FieldTable args;
qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
- // We only replicate binds for a replicated queue to replicated
- // exchange that both exist locally.
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
- replicationTest.replicateLevel(args))
+ // We only replicate binds for a replicated queue to replicated exchange
+ // that both exist locally. Respect the replication level set in the
+ // bind arguments, but replicate by default.
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue) &&
+ ReplicationTest(ALL).getLevel(args))
{
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
@@ -569,8 +581,8 @@ void BrokerReplicator::doEventUnbind(Var
queues.find(values[QNAME].asString());
// We only replicate unbinds for a replicated queue to replicated
// exchange that both exist locally.
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue))
{
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
@@ -588,9 +600,11 @@ void BrokerReplicator::doEventMembersUpd
void BrokerReplicator::doEventSubscribe(Variant::Map& values) {
// Ignore queue replicator subscriptions.
if (QueueReplicator::isReplicatorName(values[DEST].asString())) return;
- QPID_LOG(debug, logPrefix << "Subscribe event: " << values[QNAME]);
boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(values[QNAME]);
- if (qr) qr->setSubscribed();
+ if (qr) {
+ qr->setSubscribed();
+ QPID_LOG(debug, logPrefix << "Subscribe event: " << values[QNAME]);
+ }
}
namespace {
@@ -618,12 +632,7 @@ Variant getHaUuid(const Variant::Map& ma
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!replicationTest.isReplicated(
- CONFIGURATION,
- values[ARGUMENTS].asMap(),
- values[AUTODELETE].asBool(),
- values[EXCLUSIVE].asBool()))
- return;
+ if (!replicationTest.getLevel(argsMap)) return;
string name(values[NAME].asString());
if (!queueTracker.get())
throw Exception(QPID_MSG("Unexpected queue response: " << values));
@@ -652,7 +661,7 @@ void BrokerReplicator::doResponseQueue(V
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!replicationTest.replicateLevel(argsMap)) return;
+ if (!replicationTest.getLevel(argsMap)) return;
string name = values[NAME].asString();
if (!exchangeTracker.get())
throw Exception(QPID_MSG("Unexpected exchange response: " << values));
@@ -706,10 +715,11 @@ void BrokerReplicator::doResponseBind(Va
framing::FieldTable args;
qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
- // Automatically replicate binding if queue and exchange exist and are replicated
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
- replicationTest.replicateLevel(args))
+ // Automatically replicate binding if queue and exchange exist and are replicated.
+ // Respect replicate setting in binding args but default to replicated.
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue) &&
+ ReplicationTest(ALL).getLevel(args))
{
string key = values[BINDING_KEY].asString();
QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
@@ -729,8 +739,7 @@ void BrokerReplicator::doResponseHaBroke
try {
QPID_LOG(trace, logPrefix << "HA Broker response: " << values);
ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
- ReplicateLevel primary = replicationTest.replicateLevel(
- values[REPLICATE_DEFAULT].asString());
+ ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString());
if (mine != primary)
throw Exception(QPID_MSG("Replicate default on backup (" << mine
<< ") does not match primary (" << primary << ")"));
@@ -747,7 +756,7 @@ void BrokerReplicator::doResponseHaBroke
boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
const boost::shared_ptr<Queue>& queue)
{
- if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) {
+ if (replicationTest.getLevel(*queue) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
new QueueReplicator(haBroker, queue, link));
if (!exchanges.registerExchange(qr))
@@ -772,6 +781,15 @@ void BrokerReplicator::deleteQueue(const
void BrokerReplicator::deleteExchange(const std::string& name) {
try {
+ boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name);
+ if (!exchange) {
+ QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name);
+ return;
+ }
+ if (exchange->inUseAsAlternate()) {
+ QPID_LOG(warning, "Cannot delete exchange, in use as alternate: " << name);
+ return;
+ }
broker.deleteExchange(name, userId, remoteHost);
QPID_LOG(debug, logPrefix << "Exchange deleted: " << name);
} catch (const framing::NotFoundException&) {
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.h Mon Feb 18 10:18:24 2013
@@ -61,7 +61,9 @@ class QueueReplicator;
* exchanges and bindings to replicate the primary.
* It also creates QueueReplicators for newly replicated queues.
*
- * THREAD UNSAFE: Only called in Link connection thread, no need for locking.
+ * THREAD UNSAFE:
+ * All members except shutdown are only called in the Link's connection thread context.
+ * shutdown() does not use any mutable state.
*
*/
class BrokerReplicator : public broker::Exchange,
@@ -96,7 +98,7 @@ class BrokerReplicator : public broker::
class ErrorListener;
class ConnectionObserver;
- void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+ void connected(broker::Bridge&, broker::SessionHandler&);
void doEventQueueDeclare(types::Variant::Map& values);
void doEventQueueDelete(types::Variant::Map& values);
@@ -134,13 +136,14 @@ class BrokerReplicator : public broker::
void deleteExchange(const std::string& name);
void autoDeleteCheck(boost::shared_ptr<broker::Exchange>);
+
void disconnected();
void setMembership(const types::Variant::List&); // Set membership from list.
std::string logPrefix;
- std::string userId, remoteHost;
ReplicationTest replicationTest;
+ std::string userId, remoteHost;
HaBroker& haBroker;
broker::Broker& broker;
broker::ExchangeRegistry& exchanges;
@@ -155,6 +158,7 @@ class BrokerReplicator : public broker::
EventDispatchMap dispatch;
std::auto_ptr<UpdateTracker> queueTracker;
std::auto_ptr<UpdateTracker> exchangeTracker;
+ boost::shared_ptr<ConnectionObserver> connectionObserver;
};
}} // namespace qpid::broker
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Feb 18 10:18:24 2013
@@ -61,7 +61,6 @@ using boost::dynamic_pointer_cast;
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: systemId(b.getSystem()->getSystemId().data()),
settings(s),
- replicationTest(s.replicateDefault.get()),
broker(b),
observer(new ConnectionObserver(*this, systemId)),
role(new StandAlone),
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h Mon Feb 18 10:18:24 2013
@@ -25,7 +25,6 @@
#include "BrokerInfo.h"
#include "Membership.h"
#include "types.h"
-#include "ReplicationTest.h"
#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
@@ -85,7 +84,6 @@ class HaBroker : public management::Mana
void shutdown(const std::string& message);
BrokerStatus getStatus() const;
- ReplicationTest getReplicationTest() const { return replicationTest; }
boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
BrokerInfo getBrokerInfo() const { return membership.getInfo(); }
@@ -108,7 +106,6 @@ class HaBroker : public management::Mana
mutable sys::Mutex lock;
Url publicUrl, brokerUrl;
std::vector<Url> knownBrokers;
- ReplicationTest replicationTest;
// Independently thread-safe member variables
broker::Broker& broker;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Membership.cpp Mon Feb 18 10:18:24 2013
@@ -59,6 +59,7 @@ void Membership::add(const BrokerInfo& b
void Membership::remove(const types::Uuid& id) {
Mutex::ScopedLock l(lock);
+ if (id == self) return; // Never remove myself
BrokerInfo::Map::iterator i = brokers.find(id);
if (i != brokers.end()) {
brokers.erase(i);
@@ -73,7 +74,7 @@ bool Membership::contains(const types::U
void Membership::assign(const types::Variant::List& list) {
Mutex::ScopedLock l(lock);
- brokers.clear();
+ clear();
for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
BrokerInfo b(i->asMap());
brokers[b.getSystemId()] = b;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.cpp Mon Feb 18 10:18:24 2013
@@ -83,7 +83,8 @@ Primary* Primary::instance = 0;
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
- logPrefix("Primary: "), active(false)
+ logPrefix("Primary: "), active(false),
+ replicationTest(hb.getSettings().replicateDefault.get())
{
hb.getMembership().setStatus(RECOVERING);
assert(instance == 0);
@@ -97,8 +98,7 @@ Primary::Primary(HaBroker& hb, const Bro
// the QueueGuards are created.
QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
- boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(*i, haBroker.getReplicationTest(), 0));
+ boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) expectedBackups.insert(backup);
backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards
@@ -196,19 +196,25 @@ void Primary::readyReplica(const Replica
// NOTE: Called with queue registry lock held.
void Primary::queueCreate(const QueuePtr& q) {
- if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *q)) {
+ // Set replication argument.
+ ReplicateLevel level = replicationTest.useLevel(*q);
+ QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
+ << " replication: " << printable(level));
+ q->addArgument(QPID_REPLICATE, printable(level).str());
+ if (level) {
// Give each queue a unique id to avoid confusion of same-named queues.
q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
- }
- Mutex::ScopedLock l(lock);
- for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
- i->second->queueCreate(q);
- checkReady(i, l);
+ Mutex::ScopedLock l(lock);
+ for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
+ i->second->queueCreate(q);
+ checkReady(i, l);
+ }
}
}
// NOTE: Called with queue registry lock held.
void Primary::queueDestroy(const QueuePtr& q) {
+ QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
Mutex::ScopedLock l(lock);
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
i->second->queueDestroy(q);
@@ -217,16 +223,21 @@ void Primary::queueDestroy(const QueuePt
// NOTE: Called with exchange registry lock held.
void Primary::exchangeCreate(const ExchangePtr& ex) {
- if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *ex)) {
+ ReplicateLevel level = replicationTest.useLevel(*ex);
+ QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName()
+ << " replication: " << printable(level));
+ FieldTable args = ex->getArgs();
+ args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg.
+ if (level) {
// Give each exchange a unique id to avoid confusion of same-named exchanges.
- FieldTable args = ex->getArgs();
args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0])));
- ex->setArgs(args);
}
+ ex->setArgs(args);
}
// NOTE: Called with exchange registry lock held.
-void Primary::exchangeDestroy(const ExchangePtr&) {
+void Primary::exchangeDestroy(const ExchangePtr& ex) {
+ QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName());
// Do nothing
}
@@ -237,22 +248,24 @@ void Primary::opened(broker::Connection&
BackupMap::iterator i = backups.find(info.getSystemId());
if (i == backups.end()) {
QPID_LOG(info, logPrefix << "New backup connected: " << info);
- boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(info, haBroker.getReplicationTest(), &connection));
+ boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection));
{
// Avoid deadlock with queue registry lock.
Mutex::ScopedUnlock u(lock);
backup->setCatchupQueues(haBroker.getBroker().getQueues(), false);
}
backups[info.getSystemId()] = backup;
+ i = backups.find(info.getSystemId());
}
else {
QPID_LOG(info, logPrefix << "Known backup connected: " << info);
i->second->setConnection(&connection);
- checkReady(i, l);
}
- if (info.getStatus() == JOINING) info.setStatus(CATCHUP);
- membership.add(info);
+ if (info.getStatus() == JOINING) {
+ info.setStatus(CATCHUP);
+ membership.add(info);
+ }
+ if (i != backups.end()) checkReady(i, l);
}
else
QPID_LOG(debug, logPrefix << "Accepted client connection "
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Primary.h Mon Feb 18 10:18:24 2013
@@ -24,6 +24,7 @@
#include "types.h"
#include "BrokerInfo.h"
+#include "ReplicationTest.h"
#include "Role.h"
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
@@ -104,6 +105,8 @@ class Primary : public Role
Membership& membership;
std::string logPrefix;
bool active;
+ ReplicationTest replicationTest;
+
/**
* Set of expected backups that must be ready before we declare ourselves
* active. These are backups that were known and ready before the primary
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.cpp Mon Feb 18 10:18:24 2013
@@ -67,44 +67,30 @@ QueueGuard::~QueueGuard() { cancel(); }
void QueueGuard::enqueued(const Message& m) {
// Delay completion
QPID_LOG(trace, logPrefix << "Delayed completion of " << m.getSequence());
+ Mutex::ScopedLock l(lock);
+ if (cancelled) return; // Don't record enqueues after we are cancelled.
+ assert(delayed.find(m.getSequence()) == delayed.end());
+ delayed[m.getSequence()] = m.getIngressCompletion();
m.getIngressCompletion()->startCompleter();
- {
- Mutex::ScopedLock l(lock);
- if (cancelled) return; // Don't record enqueues after we are cancelled.
- assert(delayed.find(m.getSequence()) == delayed.end());
- delayed[m.getSequence()] = m.getIngressCompletion();
- }
}
// NOTE: Called with message lock held.
void QueueGuard::dequeued(const Message& m) {
QPID_LOG(trace, logPrefix << "Dequeued " << m);
- ReplicatingSubscription* rs=0;
- {
- Mutex::ScopedLock l(lock);
- rs = subscription;
- }
- if (rs) rs->dequeued(m);
- complete(m.getSequence());
-}
-
-void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) {
- for (Delayed::iterator i = begin; i != end; ++i) {
- QPID_LOG(trace, logPrefix << "Completed " << i->first);
- i->second->finishCompleter();
- }
+ Mutex::ScopedLock l(lock);
+ if (subscription) subscription->dequeued(m);
+ complete(m.getSequence(), l);
}
void QueueGuard::cancel() {
queue.removeObserver(observer);
- Delayed removed;
- {
- Mutex::ScopedLock l(lock);
- if (cancelled) return;
- cancelled = true;
- delayed.swap(removed);
+ Mutex::ScopedLock l(lock);
+ if (cancelled) return;
+ cancelled = true;
+ for (Delayed::iterator i = delayed.begin(); i != delayed.end();) {
+ complete(i, l);
+ delayed.erase(i++);
}
- completeRange(removed.begin(), removed.end());
}
void QueueGuard::attach(ReplicatingSubscription& rs) {
@@ -115,37 +101,34 @@ void QueueGuard::attach(ReplicatingSubsc
bool QueueGuard::subscriptionStart(SequenceNumber position) {
// Complete any messages before or at the ReplicatingSubscription start position.
// Those messages are already on the backup.
- Delayed removed;
- {
- Mutex::ScopedLock l(lock);
- Delayed::iterator i = delayed.begin();
- while(i != delayed.end() && i->first <= position) {
- removed.insert(*i);
- delayed.erase(i++);
- }
+ Mutex::ScopedLock l(lock);
+ Delayed::iterator i = delayed.begin();
+ while(i != delayed.end() && i->first <= position) {
+ complete(i, l);
+ delayed.erase(i++);
}
- completeRange(removed.begin(), removed.end());
return position >= range.back;
}
void QueueGuard::complete(SequenceNumber sequence) {
- boost::intrusive_ptr<broker::AsyncCompletion> m;
- {
- Mutex::ScopedLock l(lock);
- // The same message can be completed twice, by
- // ReplicatingSubscription::acknowledged and dequeued. Remove it
- // from the map so we only call finishCompleter() once
- Delayed::iterator i = delayed.find(sequence);
- if (i != delayed.end()) {
- m = i->second;
- delayed.erase(i);
- }
+ Mutex::ScopedLock l(lock);
+ complete(sequence, l);
+}
+void QueueGuard::complete(SequenceNumber sequence, Mutex::ScopedLock& l) {
+ // The same message can be completed twice, by
+ // ReplicatingSubscription::acknowledged and dequeued. Remove it
+ // from the map so we only call finishCompleter() once
+ Delayed::iterator i = delayed.find(sequence);
+ if (i != delayed.end()) {
+ complete(i, l);
+ delayed.erase(i);
}
- if (m) {
- QPID_LOG(trace, logPrefix << "Completed " << sequence);
- m->finishCompleter();
- }
+}
+
+void QueueGuard::complete(Delayed::iterator i, Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Completed " << i->first);
+ i->second->finishCompleter();
}
}} // namespaces qpid::ha
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/QueueGuard.h Mon Feb 18 10:18:24 2013
@@ -54,6 +54,9 @@ class ReplicatingSubscription;
* THREAD SAFE: Concurrent calls:
* - enqueued() via QueueObserver in arbitrary connection threads.
* - attach(), cancel(), complete() from ReplicatingSubscription in subscription thread.
+ *
+ * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held
+ * QueueGuard MAY call ReplicatingSubscription with it's lock held.
*/
class QueueGuard {
public:
@@ -104,18 +107,20 @@ class QueueGuard {
private:
class QueueObserver;
+ typedef std::map<framing::SequenceNumber,
+ boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
+
+ void complete(framing::SequenceNumber, sys::Mutex::ScopedLock &);
+ void complete(Delayed::iterator, sys::Mutex::ScopedLock &);
sys::Mutex lock;
bool cancelled;
std::string logPrefix;
broker::Queue& queue;
- typedef std::map<framing::SequenceNumber, boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
Delayed delayed;
ReplicatingSubscription* subscription;
boost::shared_ptr<QueueObserver> observer;
QueueRange range;
-
- void completeRange(Delayed::iterator begin, Delayed::iterator end);
};
}} // namespace qpid::ha
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Mon Feb 18 10:18:24 2013
@@ -34,15 +34,16 @@ using sys::Mutex;
using boost::bind;
RemoteBackup::RemoteBackup(
- const BrokerInfo& info, ReplicationTest rt, broker::Connection* c
+ const BrokerInfo& info, broker::Connection* c
) : logPrefix("Primary: Remote backup "+info.getLogId()+": "),
- brokerInfo(info), replicationTest(rt), connection(c), reportedReady(false)
+ brokerInfo(info), replicationTest(NONE), connection(c), reportedReady(false)
{}
void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards)
{
- QPID_LOG(debug, logPrefix << "Setting catch-up queues" << (createGuards ? " and guards" : ""));
queues.eachQueue(boost::bind(&RemoteBackup::catchupQueue, this, _1, createGuards));
+ QPID_LOG(debug, logPrefix << "Set " << catchupQueues.size() << " catch-up queues"
+ << (createGuards ? " and guards" : ""));
}
RemoteBackup::~RemoteBackup() { cancel(); }
@@ -64,7 +65,7 @@ bool RemoteBackup::isReady() {
}
void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {
- if (replicationTest.isReplicated(ALL, *q)) {
+ if (replicationTest.getLevel(*q) == ALL) {
QPID_LOG(debug, logPrefix << "Catch-up queue"
<< (createGuard ? " and guard" : "") << ": " << q->getName());
catchupQueues.insert(q);
@@ -105,7 +106,7 @@ void RemoteBackup::ready(const QueuePtr&
// Called via ConfigurationObserver::queueCreate and from catchupQueue
void RemoteBackup::queueCreate(const QueuePtr& q) {
- if (replicationTest.isReplicated(ALL, *q))
+ if (replicationTest.getLevel(*q) == ALL)
guards[q].reset(new QueueGuard(*q, brokerInfo));
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/RemoteBackup.h Mon Feb 18 10:18:24 2013
@@ -55,7 +55,7 @@ class RemoteBackup
/** Note: isReady() can be true after construction
*@param connected true if the backup is already connected.
*/
- RemoteBackup(const BrokerInfo&, ReplicationTest, broker::Connection*);
+ RemoteBackup(const BrokerInfo&, broker::Connection*);
~RemoteBackup();
/** Set all queues in the registry as catch-up queues.
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Mon Feb 18 10:18:24 2013
@@ -228,15 +228,16 @@ void ReplicatingSubscription::initialize
}
// Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) {
- position = m.getSequence();
+bool ReplicatingSubscription::deliver(
+ const qpid::broker::QueueCursor& c, const qpid::broker::Message& m)
+{
try {
- QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName() << "[" << m.getSequence() << "]");
+ QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence());
{
Mutex::ScopedLock l(lock);
- //FIXME GRS: position is no longer set//assert(position == m.getSequence());
+ position = m.getSequence();
- // m.getSequence() is the position of the newly enqueued message on local queue.
+ // m.getSequence() is the position of the new message on local queue.
// backupPosition is latest position on backup queue before enqueueing
if (m.getSequence() <= backupPosition)
throw Exception(
@@ -252,7 +253,7 @@ bool ReplicatingSubscription::deliver(co
}
return ConsumerImpl::deliver(c, m);
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName() << "[" << m.getSequence() << "]"
+ QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence()
<< ": " << e.what());
throw;
}
@@ -280,7 +281,7 @@ void ReplicatingSubscription::cancel()
// Consumer override, called on primary in the backup's IO thread.
void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
// Finish completion of message, it has been acknowledged by the backup.
- QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName() << "[" << r.getMessageId() << "]");
+ QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId());
guard->complete(r.getMessageId());
// If next message is protected by the guard then we are ready
if (r.getMessageId() >= guard->getRange().back) setReady();
@@ -309,7 +310,7 @@ void ReplicatingSubscription::sendDequeu
// arbitrary connection threads.
void ReplicatingSubscription::dequeued(const Message& m)
{
- QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() << "[" << m.getSequence() << "]");
+ QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence());
{
Mutex::ScopedLock l(lock);
dequeues.add(m.getSequence());
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Mon Feb 18 10:18:24 2013
@@ -61,6 +61,8 @@ class QueueGuard;
*
* Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
*
+ * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held
+ * QueueGuard MAY call ReplicatingSubscription with it's lock held.
*/
class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
{
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicationTest.cpp?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicationTest.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicationTest.cpp Mon Feb 18 10:18:24 2013
@@ -19,6 +19,7 @@
*
*/
#include "ReplicationTest.h"
+#include "qpid/log/Statement.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
@@ -28,53 +29,47 @@ namespace ha {
using types::Variant;
-ReplicateLevel ReplicationTest::replicateLevel(const std::string& str) {
+ReplicateLevel ReplicationTest::getLevel(const std::string& str) {
Enum<ReplicateLevel> rl(replicateDefault);
if (!str.empty()) rl.parse(str);
return rl.get();
}
-ReplicateLevel ReplicationTest::replicateLevel(const framing::FieldTable& f) {
+ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) {
if (f.isSet(QPID_REPLICATE))
- return replicateLevel(f.getAsString(QPID_REPLICATE));
+ return getLevel(f.getAsString(QPID_REPLICATE));
else
return replicateDefault;
}
-ReplicateLevel ReplicationTest::replicateLevel(const Variant::Map& m) {
+ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) {
Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
if (i != m.end())
- return replicateLevel(i->second.asString());
+ return getLevel(i->second.asString());
else
return replicateDefault;
}
-namespace {
-const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
-}
-
-bool ReplicationTest::isReplicated(
- ReplicateLevel level, const Variant::Map& args, bool autodelete, bool exclusive)
-{
- bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end();
- return !ignore && replicateLevel(args) >= level;
+ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) {
+ const Variant::Map& qmap(q.getSettings().original);
+ Variant::Map::const_iterator i = qmap.find(QPID_REPLICATE);
+ if (i != qmap.end())
+ return getLevel(i->second.asString());
+ else
+ return getLevel(q.getSettings().storeSettings);
}
-bool ReplicationTest::isReplicated(
- ReplicateLevel level, const framing::FieldTable& args, bool autodelete, bool exclusive)
-{
- bool ignore = autodelete && exclusive && !args.isSet(AUTO_DELETE_TIMEOUT);
- return !ignore && replicateLevel(args) >= level;
+ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) {
+ return getLevel(ex.getArgs());
}
-bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q)
+ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q)
{
- return isReplicated(level, q.getSettings().storeSettings, q.isAutoDelete(), q.hasExclusiveOwner());
+ return q.getSettings().isTemporary ? ReplicationTest(NONE).getLevel(q) : getLevel(q);
}
-bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Exchange& ex)
-{
- return replicateLevel(ex.getArgs()) >= level;
+ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) {
+ return ReplicationTest::getLevel(ex);
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicationTest.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicationTest.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicationTest.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/ReplicationTest.h Mon Feb 18 10:18:24 2013
@@ -48,22 +48,24 @@ class ReplicationTest
ReplicationTest(ReplicateLevel replicateDefault_) :
replicateDefault(replicateDefault_) {}
- // Return the simple replication level, accounting for defaults.
- ReplicateLevel replicateLevel(const std::string& str);
- ReplicateLevel replicateLevel(const framing::FieldTable& f);
- ReplicateLevel replicateLevel(const types::Variant::Map& m);
+ // Get the replication level set on an object, or default if not set.
+ ReplicateLevel getLevel(const std::string& str);
+ ReplicateLevel getLevel(const framing::FieldTable& f);
+ ReplicateLevel getLevel(const types::Variant::Map& m);
+ ReplicateLevel getLevel(const broker::Queue&);
+ ReplicateLevel getLevel(const broker::Exchange&);
+
+ // Calculate level for objects that may not have replication set,
+ // including auto-delete/exclusive settings.
+ ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive);
+ ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive);
+ ReplicateLevel useLevel(const broker::Queue&);
+ ReplicateLevel useLevel(const broker::Exchange&);
- // Return true if replication for a queue is enabled at level or higher,
- // taking account of default level and queue settings.
- bool isReplicated(ReplicateLevel level,
- const types::Variant::Map& args, bool autodelete, bool exclusive);
- bool isReplicated(ReplicateLevel level,
- const framing::FieldTable& args, bool autodelete, bool exclusive);
- bool isReplicated(ReplicateLevel level, const broker::Queue&);
- bool isReplicated(ReplicateLevel level, const broker::Exchange&);
private:
ReplicateLevel replicateDefault;
};
+
}} // namespace qpid::ha
#endif /*!QPID_HA_REPLICATIONTEST_H*/
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Role.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Role.h?rev=1447183&r1=1447182&r2=1447183&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Role.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Role.h Mon Feb 18 10:18:24 2013
@@ -25,7 +25,7 @@
#include <string>
namespace qpid {
-class Url;
+struct Url;
namespace ha {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org