You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/23 19:00:56 UTC
svn commit: r926686 [3/6] - in /qpid/branches/qmf-devel0.7a/qpid: ./ cpp/
cpp/docs/api/ cpp/docs/src/ cpp/examples/ cpp/examples/messaging/
cpp/examples/pub-sub/ cpp/include/qmf/engine/ cpp/include/qpid/agent/
cpp/include/qpid/client/amqp0_10/ cpp/incl...
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.h Tue Mar 23 18:00:49 2010
@@ -120,8 +120,8 @@ class Cluster : private Cpg::Handler, pu
bool isElder() const;
- // For debugging only. Can only be called in deliver thread.
- void debugSnapshot(const char*, Connection* =0);
+ // Generates a log message for debugging purposes.
+ std::string debugSnapshot();
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -160,7 +160,6 @@ class Cluster : private Cpg::Handler, pu
const framing::Uuid& clusterId,
framing::cluster::StoreState,
const framing::Uuid& shutdownId,
- const framing::SequenceNumber& configSeq,
const std::string& firstConfig,
Lock&);
void ready(const MemberId&, const std::string&, Lock&);
@@ -181,6 +180,7 @@ class Cluster : private Cpg::Handler, pu
void memberUpdate(Lock&);
void setClusterId(const framing::Uuid&, Lock&);
void erase(const ConnectionId&, Lock&);
+ void requestUpdate(Lock& );
void initMapCompleted(Lock&);
void becomeElder(Lock&);
@@ -252,7 +252,8 @@ class Cluster : private Cpg::Handler, pu
// Local cluster state, cluster map
enum {
- INIT, ///< Establishing inital cluster stattus.
+ PRE_INIT,///< Have not yet received complete initial status map.
+ INIT, ///< Waiting to reach cluster-size.
JOINER, ///< Sent update request, waiting for update offer.
UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete.
CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event.
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Mar 23 18:00:49 2010
@@ -77,9 +77,9 @@ const std::string shadowPrefix("[shadow]
// Shadow connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& mgmtId,
- const ConnectionId& id, unsigned int ssf)
+ const ConnectionId& id, const qpid::sys::SecuritySettings& external)
: cluster(c), self(id), catchUp(false), output(*this, out),
- connectionCtor(&output, cluster.getBroker(), mgmtId, ssf, false, 0, true),
+ connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true),
expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self),
updateIn(c.getUpdateReceiver())
@@ -88,11 +88,11 @@ Connection::Connection(Cluster& c, sys::
// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& mgmtId, MemberId member,
- bool isCatchUp, bool isLink, unsigned int ssf
+ bool isCatchUp, bool isLink, const qpid::sys::SecuritySettings& external
) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
connectionCtor(&output, cluster.getBroker(),
mgmtId,
- ssf,
+ external,
isLink,
isCatchUp ? ++catchUpId : 0,
isCatchUp), // isCatchUp => shadow
@@ -107,7 +107,10 @@ Connection::Connection(Cluster& c, sys::
QPID_LOG(info, "new client connection " << *this);
giveReadCredit(cluster.getSettings().readMax);
cluster.getMulticast().mcastControl(
- ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId, getSsf()), getId());
+ ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId,
+ connectionCtor.external.ssf,
+ connectionCtor.external.authid,
+ connectionCtor.external.nodict), getId());
}
else {
// Catch-up shadow connections initialized using nextShadow id.
@@ -122,7 +125,7 @@ Connection::Connection(Cluster& c, sys::
void Connection::init() {
connection = connectionCtor.construct();
QPID_LOG(debug, cluster << " initialized connection: " << *this
- << " ssf=" << connection->getSSF());
+ << " ssf=" << connection->getExternalSecuritySettings().ssf);
if (isLocalClient()) {
// Actively send cluster-order frames from local node
connection->setClusterOrderOutput(mcastFrameHandler);
@@ -142,9 +145,11 @@ void Connection::giveReadCredit(int cred
output.giveReadCredit(credit);
}
-void Connection::announce(const std::string& mgmtId, uint32_t ssf) {
+void Connection::announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict) {
QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
- QPID_ASSERT(ssf == connectionCtor.ssf);
+ QPID_ASSERT(ssf == connectionCtor.external.ssf);
+ QPID_ASSERT(authid == connectionCtor.external.authid);
+ QPID_ASSERT(nodict == connectionCtor.external.nodict);
init();
}
@@ -537,7 +542,7 @@ void Connection::addQueueListener(const
void Connection::managementSchema(const std::string& data) {
management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
if (!agent)
- throw Exception(QPID_MSG("Management schema update but no management agent."));
+ throw Exception(QPID_MSG("Management schema update but management not enabled."));
framing::Buffer buf(const_cast<char*>(data.data()), data.size());
agent->importSchemas(buf);
QPID_LOG(debug, cluster << " updated management schemas");
@@ -552,7 +557,7 @@ void Connection::managementSetupState(ui
<< objectNum << " seq " << bootSequence);
management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
if (!agent)
- throw Exception(QPID_MSG("Management schema update but no management agent."));
+ throw Exception(QPID_MSG("Management schema update but management not enabled."));
agent->setNextObjectId(objectNum);
agent->setBootSequence(bootSequence);
}
@@ -560,7 +565,7 @@ void Connection::managementSetupState(ui
void Connection::managementAgents(const std::string& data) {
management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
if (!agent)
- throw Exception(QPID_MSG("Management agents update but no management agent."));
+ throw Exception(QPID_MSG("Management agent update but management not enabled."));
framing::Buffer buf(const_cast<char*>(data.data()), data.size());
agent->importAgents(buf);
QPID_LOG(debug, cluster << " updated management agents");
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.h Tue Mar 23 18:00:49 2010
@@ -34,6 +34,7 @@
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/SecuritySettings.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/FrameDecoder.h"
@@ -66,10 +67,10 @@ class Connection :
/** Local connection. */
Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId, bool catchUp, bool isLink,
- unsigned int ssf);
+ const qpid::sys::SecuritySettings& external);
/** Shadow connection. */
Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id,
- unsigned int ssf);
+ const qpid::sys::SecuritySettings& external);
~Connection();
ConnectionId getId() const { return self; }
@@ -163,7 +164,7 @@ class Connection :
void exchange(const std::string& encoded);
void giveReadCredit(int credit);
- void announce(const std::string& mgmtId, uint32_t ssf);
+ void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict);
void abort();
void deliverClose();
@@ -174,7 +175,7 @@ class Connection :
void managementAgents(const std::string& data);
void managementSetupState(uint64_t objectNum, uint16_t bootSequence);
- uint32_t getSsf() const { return connectionCtor.ssf; }
+ //uint32_t getSsf() const { return connectionCtor.external.ssf; }
private:
struct NullFrameHandler : public framing::FrameHandler {
@@ -186,7 +187,7 @@ class Connection :
sys::ConnectionOutputHandler* out;
broker::Broker& broker;
std::string mgmtId;
- unsigned int ssf;
+ qpid::sys::SecuritySettings external;
bool isLink;
uint64_t objectId;
bool shadow;
@@ -195,17 +196,17 @@ class Connection :
sys::ConnectionOutputHandler* out_,
broker::Broker& broker_,
const std::string& mgmtId_,
- unsigned int ssf_,
+ const qpid::sys::SecuritySettings& external_,
bool isLink_=false,
uint64_t objectId_=0,
bool shadow_=false
- ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_),
+ ) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_),
isLink(isLink_), objectId(objectId_), shadow(shadow_)
{}
std::auto_ptr<broker::Connection> construct() {
return std::auto_ptr<broker::Connection>(
- new broker::Connection(out, broker, mgmtId, ssf, isLink, objectId, shadow));
+ new broker::Connection(out, broker, mgmtId, external, isLink, objectId, shadow));
}
};
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Tue Mar 23 18:00:49 2010
@@ -37,26 +37,26 @@ using namespace framing;
sys::ConnectionCodec*
ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
- unsigned int ssf) {
+ const qpid::sys::SecuritySettings& external) {
if (v == ProtocolVersion(0, 10))
- return new ConnectionCodec(v, out, id, cluster, false, false, ssf);
+ return new ConnectionCodec(v, out, id, cluster, false, false, external);
else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection
- return new ConnectionCodec(v, out, id, cluster, true, false, ssf);
+ return new ConnectionCodec(v, out, id, cluster, true, false, external);
return 0;
}
// Used for outgoing Link connections
sys::ConnectionCodec*
ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId,
- unsigned int ssf) {
- return new ConnectionCodec(ProtocolVersion(0,10), out, logId, cluster, false, true, ssf);
+ const qpid::sys::SecuritySettings& external) {
+ return new ConnectionCodec(ProtocolVersion(0,10), out, logId, cluster, false, true, external);
}
ConnectionCodec::ConnectionCodec(
const ProtocolVersion& v, sys::OutputControl& out,
- const std::string& logId, Cluster& cluster, bool catchUp, bool isLink, unsigned int ssf
+ const std::string& logId, Cluster& cluster, bool catchUp, bool isLink, const qpid::sys::SecuritySettings& external
) : codec(out, logId, isLink),
- interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink, ssf))
+ interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink, external))
{
std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
codec.setInputHandler(ih);
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Tue Mar 23 18:00:49 2010
@@ -53,14 +53,14 @@ class ConnectionCodec : public sys::Conn
Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c)
: next(f), cluster(c) {}
sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id,
- unsigned int conn_ssf);
+ const qpid::sys::SecuritySettings& external);
sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id,
- unsigned int conn_ssf);
+ const qpid::sys::SecuritySettings& external);
};
ConnectionCodec(const framing::ProtocolVersion&, sys::OutputControl& out,
const std::string& logId, Cluster& c, bool catchUp, bool isLink,
- unsigned int ssf);
+ const qpid::sys::SecuritySettings& external);
~ConnectionCodec();
// ConnectionCodec functions.
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp Tue Mar 23 18:00:49 2010
@@ -86,8 +86,7 @@ bool InitialStatusMap::notInitialized(co
}
bool InitialStatusMap::isComplete() const {
- return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end()
- && (map.size() >= size);
+ return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end();
}
bool InitialStatusMap::transitionToComplete() {
@@ -100,7 +99,7 @@ bool InitialStatusMap::isResendNeeded()
return ret;
}
-bool InitialStatusMap::isActive(const Map::value_type& v) {
+bool InitialStatusMap::isActiveEntry(const Map::value_type& v) {
return v.second && v.second->getActive();
}
@@ -110,10 +109,15 @@ bool InitialStatusMap::hasStore(const Ma
v.second->getStoreState() == STORE_STATE_DIRTY_STORE);
}
+bool InitialStatusMap::isActive() {
+ assert(isComplete());
+ return (find_if(map.begin(), map.end(), &isActiveEntry) != map.end());
+}
+
bool InitialStatusMap::isUpdateNeeded() {
assert(isComplete());
// We need an update if there are any active members.
- if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true;
+ if (isActive()) return true;
// Otherwise it depends on store status, get my own status:
Map::iterator me = map.find(self);
@@ -154,7 +158,7 @@ MemberSet InitialStatusMap::getElders()
Uuid InitialStatusMap::getClusterId() {
assert(isComplete());
assert(!map.empty());
- Map::iterator i = find_if(map.begin(), map.end(), &isActive);
+ Map::iterator i = find_if(map.begin(), map.end(), &isActiveEntry);
if (i != map.end())
return i->second->getClusterId(); // An active member
else
@@ -178,6 +182,7 @@ void InitialStatusMap::checkConsistent()
Uuid clusterId;
Uuid shutdownId;
+ bool initialCluster = !isActive();
for (Map::iterator i = map.begin(); i != map.end(); ++i) {
assert(i->second);
if (i->second->getActive()) ++active;
@@ -193,8 +198,10 @@ void InitialStatusMap::checkConsistent()
++clean;
checkId(clusterId, i->second->getClusterId(),
"Cluster-ID mismatch. Stores belong to different clusters.");
- checkId(shutdownId, i->second->getShutdownId(),
- "Shutdown-ID mismatch. Stores were not shut down together");
+ // Only need shutdownId to match if we are in an initially forming cluster.
+ if (initialCluster)
+ checkId(shutdownId, i->second->getShutdownId(),
+ "Shutdown-ID mismatch. Stores were not shut down together");
break;
}
}
@@ -202,10 +209,13 @@ void InitialStatusMap::checkConsistent()
if (none && (clean+dirty+empty))
throw Exception("Mixing transient and persistent brokers in a cluster");
- // If there are no active members and there are dirty stores there
- // must be at least one clean store.
- if (!active && dirty && !clean)
- throw Exception("Cannot recover, no clean store.");
+ if (map.size() >= size) {
+ // All initial members are present. If there are no active
+ // members and there are dirty stores there must be at least
+ // one clean store.
+ if (!active && dirty && !clean)
+ throw Exception("Cannot recover, no clean store.");
+ }
}
std::string InitialStatusMap::getFirstConfigStr() const {
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h Tue Mar 23 18:00:49 2010
@@ -31,6 +31,11 @@ namespace cluster {
/**
* Track status of cluster members during initialization.
+ *
+ * When a new member joins the CPG cluster, all members send an initial-status
+ * control. This map tracks those controls and provides data to make descisions
+ * about joining the cluster.
+ *
*/
class InitialStatusMap
{
@@ -38,7 +43,7 @@ class InitialStatusMap
typedef framing::ClusterInitialStatusBody Status;
InitialStatusMap(const MemberId& self, size_t size);
- /** Process a config change. @return true if we need to re-send our status */
+ /** Process a config change. May make isResendNeeded() true. */
void configChange(const MemberSet& newConfig);
/** @return true if we need to re-send status */
bool isResendNeeded();
@@ -46,13 +51,19 @@ class InitialStatusMap
/** Process received status */
void received(const MemberId&, const Status& is);
- /**@return true if the map is complete. */
+ /**@return true if the map has an entry for all current cluster members. */
bool isComplete() const;
+
+ size_t getActualSize() const { return map.size(); }
+ size_t getRequiredSize() const { return size; }
+
/**@return true if the map was completed by the last config change or received. */
bool transitionToComplete();
/**@pre isComplete(). @return this node's elders */
MemberSet getElders() const;
- /**@pre isComplete(). @return True if we need an update. */
+ /**@pre isComplete(). @return True if there are active members of the cluster. */
+ bool isActive();
+ /**@pre isComplete(). @return True if we need to request an update. */
bool isUpdateNeeded();
/**@pre isComplete(). @return Cluster-wide cluster ID. */
framing::Uuid getClusterId();
@@ -66,8 +77,9 @@ class InitialStatusMap
private:
typedef std::map<MemberId, boost::optional<Status> > Map;
static bool notInitialized(const Map::value_type&);
- static bool isActive(const Map::value_type&);
+ static bool isActiveEntry(const Map::value_type&);
static bool hasStore(const Map::value_type&);
+
Map map;
MemberSet firstConfig;
MemberId self;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.cpp Tue Mar 23 18:00:49 2010
@@ -33,10 +33,8 @@ Multicaster::Multicaster(Cpg& cpg_,
boost::function<void()> onError_) :
onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
- ready(false)
-{
- queue.start();
-}
+ ready(false), bypass(true)
+{}
void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) {
mcast(Event::control(body, id));
@@ -61,10 +59,16 @@ void Multicaster::mcast(const Event& e)
}
}
QPID_LOG(trace, "MCAST " << e);
- queue.push(e);
+ if (bypass) { // direct, don't queue
+ iovec iov = e.toIovec();
+ // FIXME aconway 2010-03-10: should do limited retry.
+ while (!cpg.mcast(&iov, 1))
+ ;
+ }
+ else
+ queue.push(e);
}
-
Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) {
try {
PollableEventQueue::Batch::const_iterator i = values.begin();
@@ -86,6 +90,11 @@ Multicaster::PollableEventQueue::Batch::
}
}
+void Multicaster::start() {
+ queue.start();
+ bypass = false;
+}
+
void Multicaster::setReady() {
sys::Mutex::ScopedLock l(lock);
ready = true;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.h Tue Mar 23 18:00:49 2010
@@ -41,16 +41,18 @@ class Cpg;
/**
* Multicast to the cluster. Shared, thread safe object.
- *
- * Runs in two modes;
*
- * initializing: Hold connection mcast events. Multicast cluster
- * events directly in the calling thread. This mode is used before
- * joining the cluster where the poller may not yet be active and we
- * want to hold any connection traffic till we join.
+ * holding mode: Hold connection events for later multicast. Cluster
+ * events are never held. Used during PRE_INIT/INIT state when we
+ * want to hold any connection traffic till we are read in the
+ * cluster.
+ *
+ * bypass mode: Multicast cluster events directly in the calling
+ * thread. This mode is used by cluster in PRE_INIT state the poller
+ * is not yet be active.
*
- * ready: normal operation. Queues all mcasts on a pollable queue,
- * multicasts connection and cluster events.
+ * Multicaster is created in bypass+holding mode, they are disabled by
+ * start and setReady respectively.
*/
class Multicaster
{
@@ -65,7 +67,9 @@ class Multicaster
void mcastBuffer(const char*, size_t, const ConnectionId&);
void mcast(const Event& e);
- /** Switch to ready mode. */
+ /** Start the pollable queue, turn off bypass mode. */
+ void start();
+ /** Switch to ready mode, release held messages. */
void setReady();
private:
@@ -81,6 +85,7 @@ class Multicaster
bool ready;
PlainEventQueue holdingQueue;
std::vector<struct ::iovec> ioVector;
+ bool bypass;
};
}} // namespace qpid::cluster
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/PollableQueue.h Tue Mar 23 18:00:49 2010
@@ -31,6 +31,13 @@ namespace cluster {
/**
* More convenient version of PollableQueue that handles iterating
* over the batch and error handling.
+ *
+ * Constructed in "bypass" mode where items are processed directly
+ * rather than put on the queue. This is important for the
+ * PRE_INIT stage when Cluster is pumping CPG dispatch directly
+ * before the poller has started.
+ *
+ * Calling start() starts the pollable queue and disabled bypass mode.
*/
template <class T> class PollableQueue : public sys::PollableQueue<T> {
public:
@@ -41,7 +48,7 @@ template <class T> class PollableQueue :
const boost::shared_ptr<sys::Poller>& poller)
: sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1),
poller),
- callback(f), error(err), message(msg)
+ callback(f), error(err), message(msg), bypass(true)
{}
typename sys::PollableQueue<T>::Batch::const_iterator
@@ -62,10 +69,21 @@ template <class T> class PollableQueue :
}
}
+ void push(const T& t) {
+ if (bypass) callback(t);
+ else sys::PollableQueue<T>::push(t);
+ }
+
+ void start() {
+ bypass = false;
+ sys::PollableQueue<T>::start();
+ }
+
private:
Callback callback;
ErrorCallback error;
std::string message;
+ bool bypass;
};
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp Tue Mar 23 18:00:49 2010
@@ -21,6 +21,7 @@
#include "StoreStatus.h"
#include "qpid/Exception.h"
#include "qpid/Msg.h"
+#include "qpid/log/Statement.h"
#include <boost/filesystem/path.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/filesystem/operations.hpp>
@@ -54,24 +55,39 @@ Uuid loadUuid(const fs::path& path) {
Uuid ret;
if (exists(path)) {
fs::ifstream i(path);
- throw_exceptions(i);
- i >> ret;
+ try {
+ throw_exceptions(i);
+ i >> ret;
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Cant load UUID from " << path.string() << ": " << e.what());
+ throw;
+ }
}
return ret;
}
void saveUuid(const fs::path& path, const Uuid& uuid) {
fs::ofstream o(path);
- throw_exceptions(o);
- o << uuid;
+ try {
+ throw_exceptions(o);
+ o << uuid;
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Cant save UUID to " << path.string() << ": " << e.what());
+ throw;
+ }
}
framing::SequenceNumber loadSeqNum(const fs::path& path) {
uint32_t n = 0;
if (exists(path)) {
fs::ifstream i(path);
- throw_exceptions(i);
- i >> n;
+ try {
+ throw_exceptions(i);
+ i >> n;
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Cant load sequence number from " << path.string() << ": " << e.what());
+ throw;
+ }
}
return framing::SequenceNumber(n);
}
@@ -105,9 +121,14 @@ void StoreStatus::save() {
create_directory(dir);
saveUuid(dir/CLUSTER_ID_FILE, clusterId);
saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId);
- fs::ofstream o(dir/CONFIG_SEQ_FILE);
- throw_exceptions(o);
- o << configSeq.getValue();
+ try {
+ fs::ofstream o(dir/CONFIG_SEQ_FILE);
+ throw_exceptions(o);
+ o << configSeq.getValue();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Cant save sequence number to " << (dir/CONFIG_SEQ_FILE).string() << ": " << e.what());
+ throw;
+ }
}
catch (const std::exception&e) {
throw Exception(QPID_MSG("Cannot save cluster store status: " << e.what()));
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Mar 23 18:00:49 2010
@@ -158,6 +158,12 @@ void UpdateClient::update() {
connection.close();
QPID_LOG(debug, updaterId << " update completed to " << updateeId
<< " at " << updateeUrl << ": " << membership);
+ // FIXME aconway 2010-03-15: This sleep avoids the race condition
+ // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
+ // It allows the connection to fully close before destroying the
+ // Connection object. Remove when the bug is fixed.
+ //
+ sys::usleep(10*1000); // 100ms
}
namespace {
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:919043-926606
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/framing/FieldValue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/framing/FieldValue.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/framing/FieldValue.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/framing/FieldValue.cpp Tue Mar 23 18:00:49 2010
@@ -130,6 +130,21 @@ Str16Value::Str16Value(const std::string
reinterpret_cast<const uint8_t*>(v.data()+v.size())))
{}
+Var16Value::Var16Value(const std::string& v, uint8_t code) :
+ FieldValue(
+ code,
+ new VariableWidthValue<2>(
+ reinterpret_cast<const uint8_t*>(v.data()),
+ reinterpret_cast<const uint8_t*>(v.data()+v.size())))
+{}
+Var32Value::Var32Value(const std::string& v, uint8_t code) :
+ FieldValue(
+ code,
+ new VariableWidthValue<4>(
+ reinterpret_cast<const uint8_t*>(v.data()),
+ reinterpret_cast<const uint8_t*>(v.data()+v.size())))
+{}
+
Struct32Value::Struct32Value(const std::string& v) :
FieldValue(
0xAB,
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue Mar 23 18:00:49 2010
@@ -96,11 +96,13 @@ ManagementAgent::~ManagementAgent ()
Mutex::ScopedLock lock (userLock);
// Reset the shared pointers to exchanges. If this is not done now, the exchanges
- // will stick around until dExchange and mExchange are implicitely destroyed (long
+ // will stick around until dExchange and mExchange are implicitly destroyed (long
// after this destructor completes). Those exchanges hold references to management
// objects that will be invalid.
dExchange.reset();
mExchange.reset();
+ v2Topic.reset();
+ v2Direct.reset();
moveNewObjectsLH();
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
@@ -183,13 +185,20 @@ void ManagementAgent::writeData ()
}
}
-void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange,
- qpid::broker::Exchange::shared_ptr _dexchange)
+void ManagementAgent::setExchange(qpid::broker::Exchange::shared_ptr _mexchange,
+ qpid::broker::Exchange::shared_ptr _dexchange)
{
mExchange = _mexchange;
dExchange = _dexchange;
}
+void ManagementAgent::setExchangeV2(qpid::broker::Exchange::shared_ptr _texchange,
+ qpid::broker::Exchange::shared_ptr _dexchange)
+{
+ v2Topic = _texchange;
+ v2Direct = _dexchange;
+}
+
void ManagementAgent::registerClass (const string& packageName,
const string& className,
uint8_t* md5Sum,
@@ -210,22 +219,19 @@ void ManagementAgent::registerEvent (con
addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
-
// Deprecated:
-ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId, bool publishNow)
+ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId)
{
// always force object to generate key string
- return addObject(object, std::string(), persistId != 0, publishNow);
+ return addObject(object, std::string(), persistId != 0);
}
ObjectId ManagementAgent::addObject(ManagementObject* object,
const std::string& key,
- bool persistent,
- bool publishNow)
+ bool persistent)
{
- Mutex::ScopedLock lock (addLock);
uint16_t sequence;
sequence = persistent ? 0 : bootSequence;
@@ -238,45 +244,21 @@ ObjectId ManagementAgent::addObject(Mana
}
object->setObjectId(objId);
- ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
- if (destIter != newManagementObjects.end()) {
- if (destIter->second->isDeleted()) {
- newDeletedManagementObjects.push_back(destIter->second);
- newManagementObjects.erase(destIter);
- } else {
- QPID_LOG(error, "ObjectId collision in addObject. class=" << object->getClassName() <<
- " key=" << objId.getV2Key());
- return objId;
- }
- }
- newManagementObjects[objId] = object;
-
- if (publishNow) {
- ::qpid::messaging::Message m;
- ::qpid::messaging::ListContent content(m);
- ::qpid::messaging::Variant::List &list_ = content.asList();
- ::qpid::messaging::Variant::Map map_;
- ::qpid::messaging::Variant::Map values;
- ::qpid::messaging::Variant::Map headers;
-
- map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
- object->getClassName(),
- "_data",
- object->getMd5Sum());
- object->mapEncodeValues(values, true, false); // send props only
- map_["_values"] = values;
- list_.push_back(map_);
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
- content.encode();
- stringstream key;
- key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
- sendBuffer(m.getContent(), 0, headers, mExchange, key.str());
- QPID_LOG(trace, "SEND Immediate ContentInd to=" << key.str());
+ {
+ Mutex::ScopedLock lock (addLock);
+ ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
+ if (destIter != newManagementObjects.end()) {
+ if (destIter->second->isDeleted()) {
+ newDeletedManagementObjects.push_back(destIter->second);
+ newManagementObjects.erase(destIter);
+ } else {
+ QPID_LOG(error, "ObjectId collision in addObject. class=" << object->getClassName() <<
+ " key=" << objId.getV2Key());
+ return objId;
+ }
+ }
+ newManagementObjects[objId] = object;
}
return objId;
@@ -350,11 +332,11 @@ void ManagementAgent::clientAdded (const
}
void ManagementAgent::clusterUpdate() {
- // Called on all cluster memebesr when a new member joins a cluster.
+ // Called on all cluster memebers when a new member joins a cluster.
// Set clientWasAdded so that on the next periodicProcessing we will do
// a full update on all cluster members.
clientWasAdded = true;
- debugSnapshot("update");
+ QPID_LOG(debug, "cluster update " << debugSnapshot());
}
void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
@@ -670,7 +652,7 @@ void ManagementAgent::periodicProcessing
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
}
- debugSnapshot("periodic");
+ QPID_LOG(debug, "periodic update " << debugSnapshot());
}
void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
@@ -1271,7 +1253,7 @@ void ManagementAgent::handleAttachReques
agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data());
agent->mgmtObject->set_brokerBank (brokerBank);
agent->mgmtObject->set_agentBank (assignedBank);
- addObject (agent->mgmtObject, 0, true);
+ addObject (agent->mgmtObject, 0);
remoteAgents[connectionRef] = agent;
QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
@@ -1893,13 +1875,13 @@ size_t ManagementAgent::validateEventSch
void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a)
{
- Mutex::ScopedLock lock (addLock);
+ Mutex::ScopedLock lock (userLock);
allocator = a;
}
uint64_t ManagementAgent::allocateId(Manageable* object)
{
- Mutex::ScopedLock lock (addLock);
+ Mutex::ScopedLock lock (userLock);
if (allocator.get()) return allocator->getIdFor(object);
return 0;
}
@@ -2031,7 +2013,7 @@ void ManagementAgent::importSchemas(qpid
void ManagementAgent::RemoteAgent::mapEncode(qpid::messaging::Variant::Map& map_) const {
::qpid::messaging::VariantMap _objId, _values;
-
+
map_["_brokerBank"] = brokerBank;
map_["_agentBank"] = agentBank;
map_["_routingKey"] = routingKey;
@@ -2068,10 +2050,10 @@ void ManagementAgent::RemoteAgent::mapDe
mgmtObject->mapDecodeValues(i->second.asMap());
}
- agent.addObject(mgmtObject, 0, true);
+ // TODO aconway 2010-03-04: see comment in encode(), readProperties doesn't set v2key.
+ mgmtObject->set_connectionRef(connectionRef);
}
-
void ManagementAgent::exportAgents(std::string& out) {
::qpid::messaging::Message m;
::qpid::messaging::ListContent content(m);
@@ -2082,15 +2064,12 @@ void ManagementAgent::exportAgents(std::
i != remoteAgents.end();
++i)
{
- ObjectId id = i->first;
+ // TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode
RemoteAgent* agent = i->second;
map_.clear();
- omap.clear();
amap.clear();
- id.mapEncode(omap);
- map_["_object_id"] = omap;
agent->mapEncode(amap);
map_["_remote_agent"] = amap;
list_.push_back(map_);
@@ -2123,16 +2102,16 @@ void ManagementAgent::importAgents(qpid:
}
}
-void ManagementAgent::debugSnapshot(const char* type) {
+std::string ManagementAgent::debugSnapshot() {
std::ostringstream msg;
- msg << type << " snapshot, agents:";
+ msg << " management snapshot:";
for (RemoteAgentMap::const_iterator i=remoteAgents.begin();
i != remoteAgents.end(); ++i)
msg << " " << i->second->routingKey;
msg << " packages: " << packages.size();
msg << " objects: " << managementObjects.size();
msg << " new objects: " << newManagementObjects.size();
- QPID_LOG(trace, msg.str());
+ return msg.str();
}
qpid::messaging::Variant::Map ManagementAgent::toMap(const FieldTable& from)
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:919043-926606
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h Tue Mar 23 18:00:49 2010
@@ -75,9 +75,12 @@ public:
/** Called by cluster to suppress management output during update. */
void suppress(bool s) { suppressed = s; }
- void setInterval (uint16_t _interval) { interval = _interval; }
- void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange,
- qpid::broker::Exchange::shared_ptr directExchange);
+ void setInterval(uint16_t _interval) { interval = _interval; }
+ void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
+ qpid::broker::Exchange::shared_ptr directExchange);
+ void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange,
+ qpid::broker::Exchange::shared_ptr directExchange);
+
int getMaxThreads () { return threadPoolSize; }
QPID_BROKER_EXTERN void registerClass (const std::string& packageName,
const std::string& className,
@@ -88,12 +91,10 @@ public:
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
- uint64_t persistId = 0,
- bool publishNow = false);
+ uint64_t persistId = 0);
QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
const std::string& key,
- bool persistent = true,
- bool publishNow = false);
+ bool persistent = true);
QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
severity_t severity = SEV_DEFAULT);
QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
@@ -238,10 +239,18 @@ private:
ManagementObjectVector newDeletedManagementObjects;
framing::Uuid uuid;
- sys::Mutex addLock;
- sys::Mutex userLock;
+
+ //
+ // Lock hierarchy: If a thread needs to take both addLock and userLock,
+ // it MUST take userLock first, then addLock.
+ //
+ sys::Mutex userLock;
+ sys::Mutex addLock;
+
qpid::broker::Exchange::shared_ptr mExchange;
qpid::broker::Exchange::shared_ptr dExchange;
+ qpid::broker::Exchange::shared_ptr v2Topic;
+ qpid::broker::Exchange::shared_ptr v2Direct;
std::string dataDir;
uint16_t interval;
qpid::broker::Broker* broker;
@@ -320,7 +329,7 @@ private:
size_t validateSchema(framing::Buffer&, uint8_t kind);
size_t validateTableSchema(framing::Buffer&);
size_t validateEventSchema(framing::Buffer&);
- void debugSnapshot(const char*);
+ std::string debugSnapshot();
};
}}
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:919043-926606
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/AddressParser.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/AddressParser.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/AddressParser.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/AddressParser.cpp Tue Mar 23 18:00:49 2010
@@ -198,6 +198,7 @@ bool AddressParser::readSimpleValue(Vari
std::string s;
if (readWord(s)) {
value = s;
+ try { value = value.asInt32(); return true; } catch (const InvalidConversion&) {}
try { value = value.asInt64(); return true; } catch (const InvalidConversion&) {}
try { value = value.asDouble(); return true; } catch (const InvalidConversion&) {}
return true;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Connection.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Connection.cpp Tue Mar 23 18:00:49 2010
@@ -23,23 +23,17 @@
#include "qpid/messaging/ConnectionImpl.h"
#include "qpid/messaging/Session.h"
#include "qpid/messaging/SessionImpl.h"
-#include "qpid/client/PrivateImplRef.h"
+#include "qpid/messaging/PrivateImplRef.h"
#include "qpid/client/amqp0_10/ConnectionImpl.h"
#include "qpid/log/Statement.h"
namespace qpid {
-namespace client {
-
-typedef PrivateImplRef<qpid::messaging::Connection> PI;
-
-}
-
namespace messaging {
-using qpid::client::PI;
+typedef PrivateImplRef<qpid::messaging::Connection> PI;
Connection::Connection(ConnectionImpl* impl) { PI::ctor(*this, impl); }
-Connection::Connection(const Connection& c) : qpid::client::Handle<ConnectionImpl>() { PI::copy(*this, c); }
+Connection::Connection(const Connection& c) : Handle<ConnectionImpl>() { PI::copy(*this, c); }
Connection& Connection::operator=(const Connection& c) { return PI::assign(*this, c); }
Connection::~Connection() { PI::dtor(*this); }
@@ -67,40 +61,11 @@ Session Connection::newSession(bool tran
return impl->newSession(transactional, name);
}
Session Connection::getSession(const std::string& name) const { return impl->getSession(name); }
-
-InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {}
-
-void parseKeyValuePair(const std::string& in, Variant::Map& out)
-{
- std::string::size_type i = in.find('=');
- if (i == std::string::npos || i == in.size() || in.find('=', i+1) != std::string::npos) {
- throw InvalidOptionString(QPID_MSG("Cannot parse name-value pair from " << in));
- } else {
- out[in.substr(0, i)] = in.substr(i+1);
- }
-}
-
-void parseOptionString(const std::string& in, Variant::Map& out)
-{
- std::string::size_type start = 0;
- std::string::size_type i = in.find('&');
- while (i != std::string::npos) {
- parseKeyValuePair(in.substr(start, i-start), out);
- if (i < in.size()) {
- start = i+1;
- i = in.find('&', start);
- } else {
- i = std::string::npos;
- }
- }
- parseKeyValuePair(in.substr(start), out);
+void Connection::setOption(const std::string& name, const Variant& value)
+{
+ impl->setOption(name, value);
}
-Variant::Map parseOptionString(const std::string& in)
-{
- Variant::Map map;
- parseOptionString(in, map);
- return map;
-}
+InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {}
}} // namespace qpid::messaging
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h Tue Mar 23 18:00:49 2010
@@ -25,12 +25,10 @@
#include "qpid/RefCounted.h"
namespace qpid {
-namespace client {
-}
-
namespace messaging {
class Session;
+class Variant;
class ConnectionImpl : public virtual qpid::RefCounted
{
@@ -40,6 +38,7 @@ class ConnectionImpl : public virtual qp
virtual void close() = 0;
virtual Session newSession(bool transactional, const std::string& name) = 0;
virtual Session getSession(const std::string& name) const = 0;
+ virtual void setOption(const std::string& name, const Variant& value) = 0;
private:
};
}} // namespace qpid::messaging
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Receiver.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Receiver.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Receiver.cpp Tue Mar 23 18:00:49 2010
@@ -22,21 +22,15 @@
#include "qpid/messaging/Message.h"
#include "qpid/messaging/ReceiverImpl.h"
#include "qpid/messaging/Session.h"
-#include "qpid/client/PrivateImplRef.h"
+#include "qpid/messaging/PrivateImplRef.h"
namespace qpid {
-namespace client {
-
-typedef PrivateImplRef<qpid::messaging::Receiver> PI;
-
-}
-
namespace messaging {
-using qpid::client::PI;
+typedef PrivateImplRef<qpid::messaging::Receiver> PI;
Receiver::Receiver(ReceiverImpl* impl) { PI::ctor(*this, impl); }
-Receiver::Receiver(const Receiver& s) : qpid::client::Handle<ReceiverImpl>() { PI::copy(*this, s); }
+Receiver::Receiver(const Receiver& s) : Handle<ReceiverImpl>() { PI::copy(*this, s); }
Receiver::~Receiver() { PI::dtor(*this); }
Receiver& Receiver::operator=(const Receiver& s) { return PI::assign(*this, s); }
bool Receiver::get(Message& message, Duration timeout) { return impl->get(message, timeout); }
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h Tue Mar 23 18:00:49 2010
@@ -24,9 +24,6 @@
#include "qpid/RefCounted.h"
namespace qpid {
-namespace client {
-}
-
namespace messaging {
class Message;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Sender.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Sender.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Sender.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Sender.cpp Tue Mar 23 18:00:49 2010
@@ -22,21 +22,14 @@
#include "qpid/messaging/Message.h"
#include "qpid/messaging/SenderImpl.h"
#include "qpid/messaging/Session.h"
-#include "qpid/client/PrivateImplRef.h"
+#include "qpid/messaging/PrivateImplRef.h"
namespace qpid {
-namespace client {
-
-typedef PrivateImplRef<qpid::messaging::Sender> PI;
-
-}
-
namespace messaging {
-
-using qpid::client::PI;
+typedef PrivateImplRef<qpid::messaging::Sender> PI;
Sender::Sender(SenderImpl* impl) { PI::ctor(*this, impl); }
-Sender::Sender(const Sender& s) : qpid::client::Handle<SenderImpl>() { PI::copy(*this, s); }
+Sender::Sender(const Sender& s) : qpid::messaging::Handle<SenderImpl>() { PI::copy(*this, s); }
Sender::~Sender() { PI::dtor(*this); }
Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); }
void Sender::send(const Message& message) { impl->send(message); }
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SenderImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SenderImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SenderImpl.h Tue Mar 23 18:00:49 2010
@@ -24,9 +24,6 @@
#include "qpid/RefCounted.h"
namespace qpid {
-namespace client {
-}
-
namespace messaging {
class Message;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Session.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Session.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Session.cpp Tue Mar 23 18:00:49 2010
@@ -25,21 +25,15 @@
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/SessionImpl.h"
-#include "qpid/client/PrivateImplRef.h"
+#include "qpid/messaging/PrivateImplRef.h"
namespace qpid {
-namespace client {
-
-typedef PrivateImplRef<qpid::messaging::Session> PI;
-
-}
-
namespace messaging {
-using qpid::client::PI;
+typedef PrivateImplRef<qpid::messaging::Session> PI;
Session::Session(SessionImpl* impl) { PI::ctor(*this, impl); }
-Session::Session(const Session& s) : qpid::client::Handle<SessionImpl>() { PI::copy(*this, s); }
+Session::Session(const Session& s) : Handle<SessionImpl>() { PI::copy(*this, s); }
Session::~Session() { PI::dtor(*this); }
Session& Session::operator=(const Session& s) { return PI::assign(*this, s); }
void Session::commit() { impl->commit(); }
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SessionImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SessionImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SessionImpl.h Tue Mar 23 18:00:49 2010
@@ -26,9 +26,6 @@
#include "qpid/messaging/Duration.h"
namespace qpid {
-namespace client {
-}
-
namespace messaging {
class Address;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Variant.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Variant.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Variant.cpp Tue Mar 23 18:00:49 2010
@@ -20,6 +20,7 @@
*/
#include "qpid/messaging/Variant.h"
#include "qpid/Msg.h"
+#include "qpid/log/Statement.h"
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
#include <algorithm>
@@ -50,7 +51,7 @@ class VariantImpl
VariantImpl(int64_t);
VariantImpl(float);
VariantImpl(double);
- VariantImpl(const std::string&);
+ VariantImpl(const std::string&, const std::string& encoding=std::string());
VariantImpl(const Variant::Map&);
VariantImpl(const Variant::List&);
VariantImpl(const Uuid&);
@@ -130,7 +131,8 @@ VariantImpl::VariantImpl(int32_t i) : ty
VariantImpl::VariantImpl(int64_t i) : type(VAR_INT64) { value.i64 = i; }
VariantImpl::VariantImpl(float f) : type(VAR_FLOAT) { value.f = f; }
VariantImpl::VariantImpl(double d) : type(VAR_DOUBLE) { value.d = d; }
-VariantImpl::VariantImpl(const std::string& s) : type(VAR_STRING) { value.v = new std::string(s); }
+VariantImpl::VariantImpl(const std::string& s, const std::string& e)
+ : type(VAR_STRING), encoding(e) { value.v = new std::string(s); }
VariantImpl::VariantImpl(const Variant::Map& m) : type(VAR_MAP) { value.v = new Variant::Map(m); }
VariantImpl::VariantImpl(const Variant::List& l) : type(VAR_LIST) { value.v = new Variant::List(l); }
VariantImpl::VariantImpl(const Uuid& u) : type(VAR_UUID) { value.v = new Uuid(u); }
@@ -448,7 +450,7 @@ VariantImpl* VariantImpl::create(const V
case VAR_INT64: return new VariantImpl(v.asInt64());
case VAR_FLOAT: return new VariantImpl(v.asFloat());
case VAR_DOUBLE: return new VariantImpl(v.asDouble());
- case VAR_STRING: return new VariantImpl(v.asString());
+ case VAR_STRING: return new VariantImpl(v.asString(), v.getEncoding());
case VAR_MAP: return new VariantImpl(v.asMap());
case VAR_LIST: return new VariantImpl(v.asList());
case VAR_UUID: return new VariantImpl(v.asUuid());
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Tue Mar 23 18:00:49 2010
@@ -22,6 +22,7 @@
#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
+#include "qpid/sys/SecuritySettings.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
@@ -144,7 +145,7 @@ void AsynchIOHandler::readbuff(AsynchIO&
decoded = in.getPosition();
QPID_LOG(debug, "RECV [" << identifier << "] INIT(" << protocolInit << ")");
try {
- codec = factory->create(protocolInit.getVersion(), *this, identifier, 0);
+ codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
if (!codec) {
//TODO: may still want to revise this...
//send valid version header & close connection.
@@ -200,7 +201,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&)
void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
- codec = factory->create(*this, identifier, 0);
+ codec = factory->create(*this, identifier, SecuritySettings());
write(framing::ProtocolInitiation(codec->getVersion()));
return;
}
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ConnectionCodec.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ConnectionCodec.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ConnectionCodec.h Tue Mar 23 18:00:49 2010
@@ -30,6 +30,7 @@ namespace sys {
class InputHandlerFactory;
class OutputControl;
+struct SecuritySettings;
/**
* Interface of coder/decoder for a connection of a specific protocol
@@ -49,27 +50,15 @@ class ConnectionCodec : public Codec {
struct Factory {
virtual ~Factory() {}
- /** Security Strength Factor - indicates the level of security provided
- * by the underlying transport. If zero, the transport provides no
- * security (e.g. TCP). If non-zero, the transport provides some level
- * of security (e.g. SSL). The values for SSF can be interpreted as:
- *
- * 0 = No protection.
- * 1 = Integrity checking only.
- * >1 = Supports authentication, integrity and confidentiality.
- * The number represents the encryption key length.
- */
-
/** Return 0 if version unknown */
virtual ConnectionCodec* create(
framing::ProtocolVersion, OutputControl&, const std::string& id,
- unsigned int conn_ssf
+ const SecuritySettings&
) = 0;
/** Return "preferred" codec for outbound connections. */
virtual ConnectionCodec* create(
- OutputControl&, const std::string& id,
- unsigned int conn_ssf
+ OutputControl&, const std::string& id, const SecuritySettings&
) = 0;
};
};
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Tue Mar 23 18:00:49 2010
@@ -27,6 +27,7 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/rdma/RdmaIO.h"
#include "qpid/sys/OutputControl.h"
+#include "qpid/sys/SecuritySettings.h"
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
@@ -139,7 +140,7 @@ void RdmaIOHandler::initProtocolOut() {
// but we must be able to send
assert( codec == 0 );
assert( aio->writable() && aio->bufferAvailable() );
- codec = factory->create(*this, identifier, 0);
+ codec = factory->create(*this, identifier, SecuritySettings());
write(framing::ProtocolInitiation(codec->getVersion()));
}
@@ -186,7 +187,7 @@ void RdmaIOHandler::initProtocolIn(Rdma:
decoded = in.getPosition();
QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
- codec = factory->create(protocolInit.getVersion(), *this, identifier, 0);
+ codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
// If we failed to create the codec then we don't understand the offered protocol version
if (!codec) {
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/SslPlugin.cpp Tue Mar 23 18:00:49 2010
@@ -41,14 +41,18 @@ struct SslServerOptions : ssl::SslOption
{
uint16_t port;
bool clientAuth;
+ bool nodict;
SslServerOptions() : port(5671),
- clientAuth(false)
+ clientAuth(false),
+ nodict(false)
{
addOptions()
("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
("ssl-require-client-authentication", optValue(clientAuth),
- "Forces clients to authenticate in order to establish an SSL connection");
+ "Forces clients to authenticate in order to establish an SSL connection")
+ ("ssl-sasl-no-dict", optValue(nodict),
+ "Disables SASL mechanisms that are vulnerable to passive dictionary-based password attacks");
}
};
@@ -57,6 +61,7 @@ class SslProtocolFactory : public Protoc
qpid::sys::ssl::SslSocket listener;
const uint16_t listeningPort;
std::auto_ptr<qpid::sys::ssl::SslAcceptor> acceptor;
+ bool nodict;
public:
SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
@@ -97,7 +102,8 @@ static struct SslPlugin : public Plugin
const broker::Broker::Options& opts = broker->getOptions();
ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options,
- opts.connectionBacklog, opts.tcpNoDelay));
+ opts.connectionBacklog,
+ opts.tcpNoDelay));
QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
broker->registerProtocolFactory("ssl", protocol);
} catch (const std::exception& e) {
@@ -109,12 +115,13 @@ static struct SslPlugin : public Plugin
} sslPlugin;
SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, int backlog, bool nodelay) :
- tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth))
+ tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)),
+ nodict(options.nodict)
{}
void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys::ssl::SslSocket& s,
ConnectionCodec::Factory* f, bool isClient) {
- qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getPeerAddress(), f);
+ qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getPeerAddress(), f, nodict);
if (tcpNoDelay) {
s.setTcpNoDelay(tcpNoDelay);
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp Tue Mar 23 18:00:49 2010
@@ -42,13 +42,14 @@ struct Buff : public SslIO::BufferBase {
{ delete [] bytes;}
};
-SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f) :
+SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict) :
identifier(id),
aio(0),
factory(f),
codec(0),
readError(false),
- isClient(false)
+ isClient(false),
+ nodict(_nodict)
{}
SslHandler::~SslHandler() {
@@ -111,7 +112,7 @@ void SslHandler::readbuff(SslIO& , SslIO
decoded = in.getPosition();
QPID_LOG(debug, "RECV [" << identifier << "] INIT(" << protocolInit << ")");
try {
- codec = factory->create(protocolInit.getVersion(), *this, identifier, aio->getKeyLen());
+ codec = factory->create(protocolInit.getVersion(), *this, identifier, getSecuritySettings(aio));
if (!codec) {
//TODO: may still want to revise this...
//send valid version header & close connection.
@@ -166,7 +167,7 @@ void SslHandler::nobuffs(SslIO&) {
void SslHandler::idle(SslIO&){
if (isClient && codec == 0) {
- codec = factory->create(*this, identifier, aio->getKeyLen());
+ codec = factory->create(*this, identifier, getSecuritySettings(aio));
write(framing::ProtocolInitiation(codec->getVersion()));
return;
}
@@ -183,5 +184,12 @@ void SslHandler::idle(SslIO&){
aio->queueWriteClose();
}
+SecuritySettings SslHandler::getSecuritySettings(SslIO* aio)
+{
+ SecuritySettings settings = aio->getSecuritySettings();
+ settings.nodict = nodict;
+ return settings;
+}
+
}}} // namespace qpid::sys::ssl
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.h Tue Mar 23 18:00:49 2010
@@ -45,11 +45,13 @@ class SslHandler : public OutputControl
ConnectionCodec* codec;
bool readError;
bool isClient;
+ bool nodict;
void write(const framing::ProtocolInitiation&);
+ qpid::sys::SecuritySettings getSecuritySettings(SslIO* aio);
public:
- SslHandler(std::string id, ConnectionCodec::Factory* f);
+ SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict);
~SslHandler();
void init(SslIO* a, int numBuffs);
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp Tue Mar 23 18:00:49 2010
@@ -436,4 +436,9 @@ void SslIO::close(DispatchHandle& h) {
}
}
-int SslIO::getKeyLen() {return socket.getKeyLen();}
+SecuritySettings SslIO::getSecuritySettings() {
+ SecuritySettings settings;
+ settings.ssf = socket.getKeyLen();
+ settings.authid = socket.getClientAuthId();
+ return settings;
+}
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.h Tue Mar 23 18:00:49 2010
@@ -22,6 +22,7 @@
*/
#include "qpid/sys/DispatchHandle.h"
+#include "qpid/sys/SecuritySettings.h"
#include <boost/function.hpp>
#include <deque>
@@ -156,7 +157,7 @@ public:
bool writeQueueEmpty() { return writeQueue.empty(); }
BufferBase* getQueuedBuffer();
- int getKeyLen();
+ qpid::sys::SecuritySettings getSecuritySettings();
private:
~SslIO();
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp Tue Mar 23 18:00:49 2010
@@ -102,6 +102,34 @@ std::string getService(int fd, bool loca
return servName;
}
+const std::string DOMAIN_SEPARATOR("@");
+const std::string DC_SEPARATOR(".");
+const std::string DC("DC");
+const std::string DN_DELIMS(" ,=");
+
+std::string getDomainFromSubject(std::string subject)
+{
+ std::string::size_type last = subject.find_first_not_of(DN_DELIMS, 0);
+ std::string::size_type i = subject.find_first_of(DN_DELIMS, last);
+
+ std::string domain;
+ bool nextTokenIsDC = false;
+ while (std::string::npos != i || std::string::npos != last)
+ {
+ std::string token = subject.substr(last, i - last);
+ if (nextTokenIsDC) {
+ if (domain.size()) domain += DC_SEPARATOR;
+ domain += token;
+ nextTokenIsDC = false;
+ } else if (token == DC) {
+ nextTokenIsDC = true;
+ }
+ last = subject.find_first_not_of(DN_DELIMS, i);
+ i = subject.find_first_of(DN_DELIMS, last);
+ }
+ return domain;
+}
+
}
SslSocket::SslSocket() : IOHandle(new IOHandlePrivate()), socket(0), prototype(0)
@@ -294,4 +322,25 @@ int SslSocket::getKeyLen() const
return 0;
}
+std::string SslSocket::getClientAuthId() const
+{
+ std::string authId;
+ CERTCertificate* cert = SSL_PeerCertificate(socket);
+ if (cert) {
+ authId = CERT_GetCommonName(&(cert->subject));
+ /*
+ * The NSS function CERT_GetDomainComponentName only returns
+ * the last component of the domain name, so we have to parse
+ * the subject manually to extract the full domain.
+ */
+ std::string domain = getDomainFromSubject(cert->subjectName);
+ if (!domain.empty()) {
+ authId += DOMAIN_SEPARATOR;
+ authId += domain;
+ }
+ CERT_DestroyCertificate(cert);
+ }
+ return authId;
+}
+
}}} // namespace qpid::sys::ssl
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h Tue Mar 23 18:00:49 2010
@@ -101,6 +101,7 @@ public:
int getError() const;
int getKeyLen() const;
+ std::string getClientAuthId() const;
private:
mutable std::string connectname;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/xml/XmlExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/xml/XmlExchange.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/xml/XmlExchange.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/xml/XmlExchange.cpp Tue Mar 23 18:00:49 2010
@@ -34,6 +34,10 @@
#include <xercesc/framework/MemBufInputSource.hpp>
+#ifdef XQ_EFFECTIVE_BOOLEAN_VALUE_HPP
+#include <xqilla/ast/XQEffectiveBooleanValue.hpp>
+#endif
+
#include <xqilla/ast/XQGlobalVariable.hpp>
#include <xqilla/context/ItemFactory.hpp>
@@ -51,7 +55,7 @@ namespace qpid {
namespace broker {
- XmlExchange::XmlExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b)
+XmlExchange::XmlExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
@@ -180,7 +184,13 @@ bool XmlExchange::matches(Query& query,
}
Result result = query->execute(context.get());
+#ifdef XQ_EFFECTIVE_BOOLEAN_VALUE_HPP
+ Item::Ptr first_ = result->next(context.get());
+ Item::Ptr second_ = result->next(context.get());
+ return XQEffectiveBooleanValue::get(first_, second_, context.get(), 0);
+#else
return result->getEffectiveBooleanValue(context.get(), 0);
+#endif
}
catch (XQException& e) {
QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent);
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/ClusterFixture.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/ClusterFixture.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/ClusterFixture.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/ClusterFixture.cpp Tue Mar 23 18:00:49 2010
@@ -130,11 +130,11 @@ void ClusterFixture::kill(size_t n, int
forkedBrokers[n]->kill(sig);
}
-/** Kill a broker and suppressing errors from closing connection c. */
+/** Kill a broker and suppress errors from closing connection c. */
void ClusterFixture::killWithSilencer(size_t n, client::Connection& c, int sig) {
ScopedSuppressLogging sl;
- kill(n,sig);
try { c.close(); } catch(...) {}
+ kill(n,sig);
}
/**
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/InitialStatusMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/InitialStatusMap.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/InitialStatusMap.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/InitialStatusMap.cpp Tue Mar 23 18:00:49 2010
@@ -37,19 +37,19 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTes
typedef InitialStatusMap::Status Status;
Status activeStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) {
- return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid(), 0,
+ return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid(),
encodeMemberSet(ms));
}
Status newcomerStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) {
- return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid(), 0,
+ return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid(),
encodeMemberSet(ms));
}
Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid(),
const MemberSet& ms=MemberSet())
{
- return Status(ProtocolVersion(), 0, active, start, state, stop, 0,
+ return Status(ProtocolVersion(), 0, active, start, state, stop,
encodeMemberSet(ms));
}
@@ -173,20 +173,6 @@ QPID_AUTO_TEST_CASE(testInteveningConfig
BOOST_CHECK_EQUAL(map.getClusterId(), id);
}
-QPID_AUTO_TEST_CASE(testInitialSize) {
- InitialStatusMap map(MemberId(0), 3);
- map.configChange(list_of<MemberId>(0)(1));
- map.received(MemberId(0), newcomerStatus());
- map.received(MemberId(1), newcomerStatus());
- BOOST_CHECK(!map.isComplete());
-
- map.configChange(list_of<MemberId>(0)(1)(2));
- map.received(MemberId(0), newcomerStatus());
- map.received(MemberId(1), newcomerStatus());
- map.received(MemberId(2), newcomerStatus());
- BOOST_CHECK(map.isComplete());
-}
-
QPID_AUTO_TEST_CASE(testAllCleanNoUpdate) {
InitialStatusMap map(MemberId(0), 3);
map.configChange(list_of<MemberId>(0)(1)(2));
@@ -244,8 +230,6 @@ QPID_AUTO_TEST_CASE(testEmptyAlone) {
BOOST_CHECK(!map.isUpdateNeeded());
}
-// FIXME aconway 2009-11-20: consistency tests for mixed stores,
-
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Makefile.am?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Makefile.am Tue Mar 23 18:00:49 2010
@@ -17,8 +17,6 @@
# under the License.
#
-SUBDIRS = . testagent
-
AM_CXXFLAGS = $(WARNING_CFLAGS) -DBOOST_TEST_DYN_LINK
INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src
PUBLIC_INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include # Use public API only
@@ -381,3 +379,5 @@ python_prep:
--prefix=$(PYTHON_BLD_DIR) --install-lib=$(PYTHON_BLD_DIR) \
--install-scripts=$(PYTHON_BLD_DIR)/commands; \
else echo "WARNING: python client not built, missing $(PYTHON_SRC_DIR)"; fi
+
+include testagent.mk
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/MessagingSessionTests.cpp Tue Mar 23 18:00:49 2010
@@ -336,6 +336,12 @@ QPID_AUTO_TEST_CASE(testMapMessage)
MapContent content(out);
content["abc"] = "def";
content["pi"] = 3.14f;
+ Variant utf8("A utf 8 string");
+ utf8.setEncoding("utf8");
+ content["utf8"] = utf8;
+ Variant utf16("\x00\x61\x00\x62\x00\x63");
+ utf16.setEncoding("utf16");
+ content["utf16"] = utf16;
content.encode();
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
@@ -343,6 +349,10 @@ QPID_AUTO_TEST_CASE(testMapMessage)
MapView view(in);
BOOST_CHECK_EQUAL(view["abc"].asString(), "def");
BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f);
+ BOOST_CHECK_EQUAL(view["utf8"].asString(), utf8.asString());
+ BOOST_CHECK_EQUAL(view["utf8"].getEncoding(), utf8.getEncoding());
+ BOOST_CHECK_EQUAL(view["utf16"].asString(), utf16.asString());
+ BOOST_CHECK_EQUAL(view["utf16"].getEncoding(), utf16.getEncoding());
fix.session.acknowledge();
}
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Variant.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Variant.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Variant.cpp Tue Mar 23 18:00:49 2010
@@ -178,6 +178,21 @@ QPID_AUTO_TEST_CASE(testIsEqualTo)
BOOST_CHECK_EQUAL(a, b);
}
+QPID_AUTO_TEST_CASE(testEncoding)
+{
+ Variant a("abc");
+ a.setEncoding("utf8");
+ Variant b = a;
+ Variant map = Variant::Map();
+ map.asMap()["a"] = a;
+ map.asMap()["b"] = b;
+ BOOST_CHECK_EQUAL(a.getEncoding(), std::string("utf8"));
+ BOOST_CHECK_EQUAL(a.getEncoding(), b.getEncoding());
+ BOOST_CHECK_EQUAL(a.getEncoding(), map.asMap()["a"].getEncoding());
+ BOOST_CHECK_EQUAL(b.getEncoding(), map.asMap()["b"].getEncoding());
+ BOOST_CHECK_EQUAL(map.asMap()["a"].getEncoding(), map.asMap()["b"].getEncoding());
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.fail
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.fail?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.fail (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.fail Tue Mar 23 18:00:49 2010
@@ -1,3 +1,3 @@
-cluster_tests.LongTests.test_management
+
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org