You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/23 19:00:56 UTC

svn commit: r926686 [3/6] - in /qpid/branches/qmf-devel0.7a/qpid: ./ cpp/ cpp/docs/api/ cpp/docs/src/ cpp/examples/ cpp/examples/messaging/ cpp/examples/pub-sub/ cpp/include/qmf/engine/ cpp/include/qpid/agent/ cpp/include/qpid/client/amqp0_10/ cpp/incl...

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.h Tue Mar 23 18:00:49 2010
@@ -120,8 +120,8 @@ class Cluster : private Cpg::Handler, pu
 
     bool isElder() const;
 
-    // For debugging only. Can only be called in deliver thread.
-    void debugSnapshot(const char*, Connection* =0);
+    // Generates a log message for debugging purposes.
+    std::string debugSnapshot();
 
   private:
     typedef sys::Monitor::ScopedLock Lock;
@@ -160,7 +160,6 @@ class Cluster : private Cpg::Handler, pu
                        const framing::Uuid& clusterId,
                        framing::cluster::StoreState,
                        const framing::Uuid& shutdownId,
-                       const framing::SequenceNumber& configSeq,
                        const std::string& firstConfig,
                        Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
@@ -181,6 +180,7 @@ class Cluster : private Cpg::Handler, pu
     void memberUpdate(Lock&);
     void setClusterId(const framing::Uuid&, Lock&);
     void erase(const ConnectionId&, Lock&);       
+    void requestUpdate(Lock& );
     void initMapCompleted(Lock&);
     void becomeElder(Lock&);
 
@@ -252,7 +252,8 @@ class Cluster : private Cpg::Handler, pu
 
     //    Local cluster state, cluster map
     enum {
-        INIT,    ///< Establishing inital cluster stattus.
+        PRE_INIT,///< Have not yet received complete initial status map.
+        INIT,    ///< Waiting to reach cluster-size.
         JOINER,  ///< Sent update request, waiting for update offer.
         UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete.
         CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event.

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Mar 23 18:00:49 2010
@@ -77,9 +77,9 @@ const std::string shadowPrefix("[shadow]
 // Shadow connection
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& mgmtId,
-                       const ConnectionId& id, unsigned int ssf)
+                       const ConnectionId& id, const qpid::sys::SecuritySettings& external)
     : cluster(c), self(id), catchUp(false), output(*this, out),
-      connectionCtor(&output, cluster.getBroker(), mgmtId, ssf, false, 0, true),
+      connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true),
       expectProtocolHeader(false),
       mcastFrameHandler(cluster.getMulticast(), self),
       updateIn(c.getUpdateReceiver())
@@ -88,11 +88,11 @@ Connection::Connection(Cluster& c, sys::
 // Local connection
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& mgmtId, MemberId member,
-                       bool isCatchUp, bool isLink, unsigned int ssf
+                       bool isCatchUp, bool isLink, const qpid::sys::SecuritySettings& external
 ) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
     connectionCtor(&output, cluster.getBroker(),
                    mgmtId,
-                   ssf,
+                   external,
                    isLink,
                    isCatchUp ? ++catchUpId : 0,
                    isCatchUp),  // isCatchUp => shadow
@@ -107,7 +107,10 @@ Connection::Connection(Cluster& c, sys::
         QPID_LOG(info, "new client connection " << *this);
         giveReadCredit(cluster.getSettings().readMax);
         cluster.getMulticast().mcastControl(
-            ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId, getSsf()), getId());
+            ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId,
+                                          connectionCtor.external.ssf,
+                                          connectionCtor.external.authid,
+                                          connectionCtor.external.nodict), getId());
     }
     else {
         // Catch-up shadow connections initialized using nextShadow id.
@@ -122,7 +125,7 @@ Connection::Connection(Cluster& c, sys::
 void Connection::init() {
     connection = connectionCtor.construct();
     QPID_LOG(debug, cluster << " initialized connection: " << *this
-             << " ssf=" << connection->getSSF());
+             << " ssf=" << connection->getExternalSecuritySettings().ssf);
     if (isLocalClient()) {  
         // Actively send cluster-order frames from local node
         connection->setClusterOrderOutput(mcastFrameHandler);
@@ -142,9 +145,11 @@ void Connection::giveReadCredit(int cred
         output.giveReadCredit(credit);
 }
 
-void Connection::announce(const std::string& mgmtId, uint32_t ssf) {
+void Connection::announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict) {
     QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
-    QPID_ASSERT(ssf == connectionCtor.ssf);
+    QPID_ASSERT(ssf == connectionCtor.external.ssf);
+    QPID_ASSERT(authid == connectionCtor.external.authid);
+    QPID_ASSERT(nodict == connectionCtor.external.nodict);
     init();
 }
 
@@ -537,7 +542,7 @@ void Connection::addQueueListener(const 
 void Connection::managementSchema(const std::string& data) {
     management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
     if (!agent)
-        throw Exception(QPID_MSG("Management schema update but no management agent."));
+        throw Exception(QPID_MSG("Management schema update but management not enabled."));
     framing::Buffer buf(const_cast<char*>(data.data()), data.size());
     agent->importSchemas(buf);
     QPID_LOG(debug, cluster << " updated management schemas");
@@ -552,7 +557,7 @@ void Connection::managementSetupState(ui
 	     << objectNum << " seq " << bootSequence);
     management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
     if (!agent)
-        throw Exception(QPID_MSG("Management schema update but no management agent."));
+        throw Exception(QPID_MSG("Management schema update but management not enabled."));
     agent->setNextObjectId(objectNum);
     agent->setBootSequence(bootSequence);
 }
@@ -560,7 +565,7 @@ void Connection::managementSetupState(ui
 void Connection::managementAgents(const std::string& data) {
     management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
     if (!agent)
-        throw Exception(QPID_MSG("Management agents update but no management agent."));
+        throw Exception(QPID_MSG("Management agent update but management not enabled."));
     framing::Buffer buf(const_cast<char*>(data.data()), data.size());
     agent->importAgents(buf);
     QPID_LOG(debug, cluster << " updated management agents");

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Connection.h Tue Mar 23 18:00:49 2010
@@ -34,6 +34,7 @@
 #include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/SecuritySettings.h"
 #include "qpid/framing/SequenceNumber.h"
 #include "qpid/framing/FrameDecoder.h"
 
@@ -66,10 +67,10 @@ class Connection :
     
     /** Local connection. */
     Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId, bool catchUp, bool isLink,
-               unsigned int ssf);
+               const qpid::sys::SecuritySettings& external);
     /** Shadow connection. */
     Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id,
-               unsigned int ssf);
+               const qpid::sys::SecuritySettings& external);
     ~Connection();
     
     ConnectionId getId() const { return self; }
@@ -163,7 +164,7 @@ class Connection :
     void exchange(const std::string& encoded);
 
     void giveReadCredit(int credit);
-    void announce(const std::string& mgmtId, uint32_t ssf);
+    void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict);
     void abort();
     void deliverClose();
 
@@ -174,7 +175,7 @@ class Connection :
     void managementAgents(const std::string& data);
     void managementSetupState(uint64_t objectNum, uint16_t bootSequence);
 
-    uint32_t getSsf() const { return connectionCtor.ssf; }
+    //uint32_t getSsf() const { return connectionCtor.external.ssf; }
 
   private:
     struct NullFrameHandler : public framing::FrameHandler {
@@ -186,7 +187,7 @@ class Connection :
         sys::ConnectionOutputHandler* out;
         broker::Broker& broker;
         std::string mgmtId;
-        unsigned int ssf;
+        qpid::sys::SecuritySettings external;
         bool isLink;
         uint64_t objectId;
         bool shadow;
@@ -195,17 +196,17 @@ class Connection :
             sys::ConnectionOutputHandler* out_,
             broker::Broker& broker_,
             const std::string& mgmtId_,
-            unsigned int ssf_,
+            const qpid::sys::SecuritySettings& external_,
             bool isLink_=false,
             uint64_t objectId_=0,
             bool shadow_=false
-        ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_),
+        ) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_),
             isLink(isLink_), objectId(objectId_), shadow(shadow_)
         {}
 
         std::auto_ptr<broker::Connection> construct() {
             return std::auto_ptr<broker::Connection>(
-                new broker::Connection(out, broker, mgmtId, ssf, isLink, objectId, shadow));
+                new broker::Connection(out, broker, mgmtId, external, isLink, objectId, shadow));
         }
     };
 

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Tue Mar 23 18:00:49 2010
@@ -37,26 +37,26 @@ using namespace framing;
 
 sys::ConnectionCodec*
 ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
-                                 unsigned int ssf) {
+                                 const qpid::sys::SecuritySettings& external) {
     if (v == ProtocolVersion(0, 10))
-        return new ConnectionCodec(v, out, id, cluster, false, false, ssf);
+        return new ConnectionCodec(v, out, id, cluster, false, false, external);
     else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection
-        return new ConnectionCodec(v, out, id, cluster, true, false, ssf); 
+        return new ConnectionCodec(v, out, id, cluster, true, false, external); 
     return 0;
 }
 
 // Used for outgoing Link connections
 sys::ConnectionCodec*
 ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId,
-                                 unsigned int ssf) {
-    return new ConnectionCodec(ProtocolVersion(0,10), out, logId, cluster, false, true, ssf);
+                                 const qpid::sys::SecuritySettings& external) {
+    return new ConnectionCodec(ProtocolVersion(0,10), out, logId, cluster, false, true, external);
 }
 
 ConnectionCodec::ConnectionCodec(
     const ProtocolVersion& v, sys::OutputControl& out,
-    const std::string& logId, Cluster& cluster, bool catchUp, bool isLink, unsigned int ssf
+    const std::string& logId, Cluster& cluster, bool catchUp, bool isLink, const qpid::sys::SecuritySettings& external
 ) : codec(out, logId, isLink),
-    interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink, ssf))
+    interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink, external))
 {
     std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
     codec.setInputHandler(ih);

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Tue Mar 23 18:00:49 2010
@@ -53,14 +53,14 @@ class ConnectionCodec : public sys::Conn
         Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c)
             : next(f), cluster(c) {}
         sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id,
-                                     unsigned int conn_ssf);
+                                     const qpid::sys::SecuritySettings& external);
         sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id,
-                                     unsigned int conn_ssf);
+                                     const qpid::sys::SecuritySettings& external);
     };
 
     ConnectionCodec(const framing::ProtocolVersion&, sys::OutputControl& out,
                     const std::string& logId, Cluster& c, bool catchUp, bool isLink,
-                    unsigned int ssf);
+                    const qpid::sys::SecuritySettings& external);
     ~ConnectionCodec();
 
     // ConnectionCodec functions.

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp Tue Mar 23 18:00:49 2010
@@ -86,8 +86,7 @@ bool InitialStatusMap::notInitialized(co
 }
 
 bool InitialStatusMap::isComplete() const {
-    return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == map.end()
-        && (map.size() >= size);
+    return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == map.end();
 }
 
 bool InitialStatusMap::transitionToComplete() {
@@ -100,7 +99,7 @@ bool InitialStatusMap::isResendNeeded() 
     return ret;
 }
 
-bool InitialStatusMap::isActive(const Map::value_type& v) {
+bool InitialStatusMap::isActiveEntry(const Map::value_type& v) {
     return v.second && v.second->getActive();
 }
 
@@ -110,10 +109,15 @@ bool InitialStatusMap::hasStore(const Ma
          v.second->getStoreState() == STORE_STATE_DIRTY_STORE);
 }
 
+bool InitialStatusMap::isActive() {
+    assert(isComplete());
+    return (find_if(map.begin(), map.end(), &isActiveEntry) != map.end());
+}
+
 bool InitialStatusMap::isUpdateNeeded() {
     assert(isComplete());
     // We need an update if there are any active members.
-    if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true;
+    if (isActive()) return true;
 
     // Otherwise it depends on store status, get my own status:
     Map::iterator me = map.find(self);
@@ -154,7 +158,7 @@ MemberSet InitialStatusMap::getElders() 
 Uuid InitialStatusMap::getClusterId() {
     assert(isComplete());
     assert(!map.empty());
-    Map::iterator i = find_if(map.begin(), map.end(), &isActive);
+    Map::iterator i = find_if(map.begin(), map.end(), &isActiveEntry);
     if (i != map.end())
         return i->second->getClusterId(); // An active member
     else
@@ -178,6 +182,7 @@ void InitialStatusMap::checkConsistent()
     Uuid clusterId;
     Uuid shutdownId;
 
+    bool initialCluster = !isActive();
     for (Map::iterator i = map.begin(); i != map.end(); ++i) {
         assert(i->second);
         if (i->second->getActive()) ++active;
@@ -193,8 +198,10 @@ void InitialStatusMap::checkConsistent()
             ++clean;
             checkId(clusterId, i->second->getClusterId(),
                     "Cluster-ID mismatch. Stores belong to different clusters.");
-            checkId(shutdownId, i->second->getShutdownId(),
-                    "Shutdown-ID mismatch. Stores were not shut down together");
+            // Only need shutdownId to match if we are in an initially forming cluster.
+            if (initialCluster)
+                checkId(shutdownId, i->second->getShutdownId(),
+                        "Shutdown-ID mismatch. Stores were not shut down together");
             break;
         }
     }
@@ -202,10 +209,13 @@ void InitialStatusMap::checkConsistent()
     if (none && (clean+dirty+empty))
         throw Exception("Mixing transient and persistent brokers in a cluster");
 
-    // If there are no active members and there are dirty stores there
-    // must be at least one clean store.
-    if (!active && dirty && !clean)
-        throw Exception("Cannot recover, no clean store.");
+    if (map.size() >= size) {
+        // All initial members are present. If there are no active
+        // members and there are dirty stores there must be at least
+        // one clean store.
+        if (!active && dirty && !clean)
+            throw Exception("Cannot recover, no clean store.");
+    }
 }
 
 std::string InitialStatusMap::getFirstConfigStr() const {

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h Tue Mar 23 18:00:49 2010
@@ -31,6 +31,11 @@ namespace cluster {
 
 /**
  * Track status of cluster members during initialization.
+ *
+ * When a new member joins the CPG cluster, all members send an initial-status
+ * control. This map tracks those controls and provides data to make descisions
+ * about joining the cluster.
+ *
  */
 class InitialStatusMap
 {
@@ -38,7 +43,7 @@ class InitialStatusMap
     typedef framing::ClusterInitialStatusBody Status;
 
     InitialStatusMap(const MemberId& self, size_t size);
-    /** Process a config change. @return true if we need to re-send our status */
+    /** Process a config change. May make isResendNeeded() true. */
     void configChange(const MemberSet& newConfig);
     /** @return true if we need to re-send status */
     bool isResendNeeded();
@@ -46,13 +51,19 @@ class InitialStatusMap
     /** Process received status */
     void received(const MemberId&, const Status& is);
 
-    /**@return true if the map is complete. */
+    /**@return true if the map has an entry for all current cluster members. */
     bool isComplete() const;
+
+    size_t getActualSize() const { return map.size(); }
+    size_t getRequiredSize() const { return size; }
+
     /**@return true if the map was completed by the last config change or received. */
     bool transitionToComplete();
     /**@pre isComplete(). @return this node's elders */
     MemberSet getElders() const;
-    /**@pre isComplete(). @return True if we need an update. */
+    /**@pre isComplete(). @return True if there are active members of the cluster. */
+    bool isActive();
+    /**@pre isComplete(). @return True if we need to request an update. */
     bool isUpdateNeeded();
     /**@pre isComplete(). @return Cluster-wide cluster ID. */
     framing::Uuid getClusterId();
@@ -66,8 +77,9 @@ class InitialStatusMap
   private:
     typedef std::map<MemberId, boost::optional<Status> > Map;
     static bool notInitialized(const Map::value_type&);
-    static bool isActive(const Map::value_type&);
+    static bool isActiveEntry(const Map::value_type&);
     static bool hasStore(const Map::value_type&);
+
     Map map;
     MemberSet firstConfig;
     MemberId self;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.cpp Tue Mar 23 18:00:49 2010
@@ -33,10 +33,8 @@ Multicaster::Multicaster(Cpg& cpg_, 
                          boost::function<void()> onError_) :
     onError(onError_), cpg(cpg_), 
     queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
-    ready(false)
-{
-    queue.start();
-}
+    ready(false), bypass(true)
+{}
 
 void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) {
     mcast(Event::control(body, id));
@@ -61,10 +59,16 @@ void Multicaster::mcast(const Event& e) 
         }
     }
     QPID_LOG(trace, "MCAST " << e);
-    queue.push(e);
+    if (bypass) {               // direct, don't queue
+        iovec iov = e.toIovec();
+        // FIXME aconway 2010-03-10: should do limited retry.
+        while (!cpg.mcast(&iov, 1))
+            ;
+    }
+    else
+        queue.push(e);
 }
 
-
 Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) {
     try {
         PollableEventQueue::Batch::const_iterator i = values.begin();
@@ -86,6 +90,11 @@ Multicaster::PollableEventQueue::Batch::
     }
 }
 
+void Multicaster::start() {
+    queue.start();
+    bypass = false;
+}
+
 void Multicaster::setReady() {
     sys::Mutex::ScopedLock l(lock);
     ready = true;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Multicaster.h Tue Mar 23 18:00:49 2010
@@ -41,16 +41,18 @@ class Cpg;
 
 /**
  * Multicast to the cluster. Shared, thread safe object.
- *
- * Runs in two modes;
  * 
- * initializing: Hold connection mcast events. Multicast cluster
- * events directly in the calling thread. This mode is used before
- * joining the cluster where the poller may not yet be active and we
- * want to hold any connection traffic till we join.
+ * holding mode: Hold connection events for later multicast. Cluster
+ * events are never held.  Used during PRE_INIT/INIT state when we
+ * want to hold any connection traffic till we are read in the
+ * cluster.
+ *
+ * bypass mode: Multicast cluster events directly in the calling
+ * thread. This mode is used by cluster in PRE_INIT state the poller
+ * is not yet be active.
  *
- * ready: normal operation. Queues all mcasts on a pollable queue,
- * multicasts connection and cluster events.
+ * Multicaster is created in bypass+holding mode, they are disabled by
+ * start and setReady respectively.
  */
 class Multicaster
 {
@@ -65,7 +67,9 @@ class Multicaster
     void mcastBuffer(const char*, size_t, const ConnectionId&);
     void mcast(const Event& e);
 
-    /** Switch to ready mode. */
+    /** Start the pollable queue, turn off bypass mode. */
+    void start();
+    /** Switch to ready mode, release held messages. */
     void setReady();
 
   private:
@@ -81,6 +85,7 @@ class Multicaster
     bool ready;
     PlainEventQueue holdingQueue;
     std::vector<struct ::iovec> ioVector;
+    bool bypass;
 };
 }} // namespace qpid::cluster
 

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/PollableQueue.h Tue Mar 23 18:00:49 2010
@@ -31,6 +31,13 @@ namespace cluster {
 /**
  * More convenient version of PollableQueue that handles iterating
  * over the batch and error handling.
+ *
+ * Constructed in "bypass" mode where items are processed directly
+ * rather than put on the queue. This is important for the
+ * PRE_INIT stage when Cluster is pumping CPG dispatch directly
+ * before the poller has started.
+ *
+ * Calling start() starts the pollable queue and disabled bypass mode.
  */
 template <class T> class PollableQueue : public sys::PollableQueue<T> {
   public:
@@ -41,7 +48,7 @@ template <class T> class PollableQueue :
                   const boost::shared_ptr<sys::Poller>& poller)
         : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1),
                                 poller),
-          callback(f), error(err), message(msg)
+          callback(f), error(err), message(msg), bypass(true)
     {}
 
     typename sys::PollableQueue<T>::Batch::const_iterator
@@ -62,10 +69,21 @@ template <class T> class PollableQueue :
         }
     }
 
+    void push(const T& t) {
+        if (bypass) callback(t);
+        else sys::PollableQueue<T>::push(t);
+    }
+
+    void start() {
+        bypass = false;
+        sys::PollableQueue<T>::start();
+    }
+
   private:
     Callback callback;
     ErrorCallback error;
     std::string message;
+    bool bypass;
 };
 
     

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp Tue Mar 23 18:00:49 2010
@@ -21,6 +21,7 @@
 #include "StoreStatus.h"
 #include "qpid/Exception.h"
 #include "qpid/Msg.h"
+#include "qpid/log/Statement.h"
 #include <boost/filesystem/path.hpp>
 #include <boost/filesystem/fstream.hpp>
 #include <boost/filesystem/operations.hpp>
@@ -54,24 +55,39 @@ Uuid loadUuid(const fs::path& path) {
     Uuid ret;
     if (exists(path)) {
         fs::ifstream i(path);
-        throw_exceptions(i);
-        i >> ret;
+        try {
+            throw_exceptions(i);
+            i >> ret;
+        } catch (const std::exception& e) {
+            QPID_LOG(error, "Cant load UUID from " << path.string() << ": " << e.what());
+            throw;
+        }
     }
     return ret;
 }
 
 void saveUuid(const fs::path& path, const Uuid& uuid) {
     fs::ofstream o(path);
-    throw_exceptions(o);
-    o << uuid;
+    try {
+        throw_exceptions(o);
+        o << uuid;
+    } catch (const std::exception& e) {
+        QPID_LOG(error, "Cant save UUID to " << path.string() << ": " << e.what());
+        throw;
+    }
 }
 
 framing::SequenceNumber loadSeqNum(const fs::path& path) {
     uint32_t n = 0;
     if (exists(path)) {
         fs::ifstream i(path);
-        throw_exceptions(i);
-        i >> n;
+        try {
+            throw_exceptions(i);
+            i >> n;
+        } catch (const std::exception& e) {
+            QPID_LOG(error, "Cant load sequence number from " << path.string() << ": " << e.what());
+            throw;
+        }
     }
     return framing::SequenceNumber(n);
 }
@@ -105,9 +121,14 @@ void StoreStatus::save() {
         create_directory(dir);
         saveUuid(dir/CLUSTER_ID_FILE, clusterId);
         saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId);
-        fs::ofstream o(dir/CONFIG_SEQ_FILE);
-        throw_exceptions(o);
-        o << configSeq.getValue();
+        try {
+            fs::ofstream o(dir/CONFIG_SEQ_FILE);
+            throw_exceptions(o);
+            o << configSeq.getValue();
+        } catch (const std::exception& e) {
+            QPID_LOG(error, "Cant save sequence number to " << (dir/CONFIG_SEQ_FILE).string() << ": " << e.what());
+            throw;
+        }
     }
     catch (const std::exception&e) {
         throw Exception(QPID_MSG("Cannot save cluster store status: " << e.what()));

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Mar 23 18:00:49 2010
@@ -158,6 +158,12 @@ void UpdateClient::update() {
     connection.close();
     QPID_LOG(debug,  updaterId << " update completed to " << updateeId
              << " at " << updateeUrl << ": " << membership);
+    // FIXME aconway 2010-03-15: This sleep avoids the race condition
+    // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
+    // It allows the connection to fully close before destroying the
+    // Connection object. Remove when the bug is fixed.
+    //
+    sys::usleep(10*1000);       // 100ms
 }
 
 namespace {

Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:919043-926606

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/framing/FieldValue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/framing/FieldValue.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/framing/FieldValue.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/framing/FieldValue.cpp Tue Mar 23 18:00:49 2010
@@ -130,6 +130,21 @@ Str16Value::Str16Value(const std::string
             reinterpret_cast<const uint8_t*>(v.data()+v.size())))
 {}
 
+Var16Value::Var16Value(const std::string& v, uint8_t code) :
+    FieldValue(
+        code,
+        new VariableWidthValue<2>(
+            reinterpret_cast<const uint8_t*>(v.data()),
+            reinterpret_cast<const uint8_t*>(v.data()+v.size())))
+{}
+Var32Value::Var32Value(const std::string& v, uint8_t code) :
+    FieldValue(
+        code,
+        new VariableWidthValue<4>(
+            reinterpret_cast<const uint8_t*>(v.data()),
+            reinterpret_cast<const uint8_t*>(v.data()+v.size())))
+{}
+
 Struct32Value::Struct32Value(const std::string& v) :
     FieldValue(
         0xAB,

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue Mar 23 18:00:49 2010
@@ -96,11 +96,13 @@ ManagementAgent::~ManagementAgent ()
         Mutex::ScopedLock lock (userLock);
 
         // Reset the shared pointers to exchanges.  If this is not done now, the exchanges
-        // will stick around until dExchange and mExchange are implicitely destroyed (long
+        // will stick around until dExchange and mExchange are implicitly destroyed (long
         // after this destructor completes).  Those exchanges hold references to management
         // objects that will be invalid.
         dExchange.reset();
         mExchange.reset();
+        v2Topic.reset();
+        v2Direct.reset();
 
         moveNewObjectsLH();
         for (ManagementObjectMap::iterator iter = managementObjects.begin ();
@@ -183,13 +185,20 @@ void ManagementAgent::writeData ()
     }
 }
 
-void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange,
-                                   qpid::broker::Exchange::shared_ptr _dexchange)
+void ManagementAgent::setExchange(qpid::broker::Exchange::shared_ptr _mexchange,
+                                  qpid::broker::Exchange::shared_ptr _dexchange)
 {
     mExchange = _mexchange;
     dExchange = _dexchange;
 }
 
+void ManagementAgent::setExchangeV2(qpid::broker::Exchange::shared_ptr _texchange,
+                                    qpid::broker::Exchange::shared_ptr _dexchange)
+{
+    v2Topic = _texchange;
+    v2Direct = _dexchange;
+}
+
 void ManagementAgent::registerClass (const string&  packageName,
                                      const string&  className,
                                      uint8_t* md5Sum,
@@ -210,22 +219,19 @@ void ManagementAgent::registerEvent (con
     addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
 }
 
-
 // Deprecated:
-ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId, bool publishNow)
+ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId)
 {
     // always force object to generate key string
-    return addObject(object, std::string(), persistId != 0, publishNow);
+    return addObject(object, std::string(), persistId != 0);
 }
 
 
 
 ObjectId ManagementAgent::addObject(ManagementObject* object,
                                     const std::string& key,
-                                    bool persistent,
-                                    bool publishNow)
+                                    bool persistent)
 {
-    Mutex::ScopedLock lock (addLock);
     uint16_t sequence;
 
     sequence = persistent ? 0 : bootSequence;
@@ -238,45 +244,21 @@ ObjectId ManagementAgent::addObject(Mana
     }
 
     object->setObjectId(objId);
-    ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
-    if (destIter != newManagementObjects.end()) {
-        if (destIter->second->isDeleted()) {
-            newDeletedManagementObjects.push_back(destIter->second);
-            newManagementObjects.erase(destIter);
-        } else {
-            QPID_LOG(error, "ObjectId collision in addObject. class=" << object->getClassName() <<
-                     " key=" << objId.getV2Key());
-            return objId;
-        }
-    }
-    newManagementObjects[objId] = object;
-
-    if (publishNow) {
-        ::qpid::messaging::Message m;
-        ::qpid::messaging::ListContent content(m);
-        ::qpid::messaging::Variant::List &list_ = content.asList();
-        ::qpid::messaging::Variant::Map  map_;
-        ::qpid::messaging::Variant::Map values;
-        ::qpid::messaging::Variant::Map  headers;
-
-        map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
-                                               object->getClassName(),
-                                               "_data",
-                                               object->getMd5Sum());
-        object->mapEncodeValues(values, true, false);  // send props only
-        map_["_values"] = values;
-        list_.push_back(map_);
-
-        headers["method"] = "indication";
-        headers["qmf.opcode"] = "_data_indication";
-        headers["qmf.content"] = "_data";
-        headers["qmf.agent"] = std::string(agentName);
 
-        content.encode();
-        stringstream key;
-        key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
-        sendBuffer(m.getContent(), 0, headers, mExchange, key.str());
-        QPID_LOG(trace, "SEND Immediate ContentInd to=" << key.str());
+    {
+        Mutex::ScopedLock lock (addLock);
+        ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
+        if (destIter != newManagementObjects.end()) {
+            if (destIter->second->isDeleted()) {
+                newDeletedManagementObjects.push_back(destIter->second);
+                newManagementObjects.erase(destIter);
+            } else {
+                QPID_LOG(error, "ObjectId collision in addObject. class=" << object->getClassName() <<
+                         " key=" << objId.getV2Key());
+                return objId;
+            }
+        }
+        newManagementObjects[objId] = object;
     }
 
     return objId;
@@ -350,11 +332,11 @@ void ManagementAgent::clientAdded (const
 }
 
 void ManagementAgent::clusterUpdate() {
-    // Called on all cluster memebesr when a new member joins a cluster.
+    // Called on all cluster memebers when a new member joins a cluster.
     // Set clientWasAdded so that on the next periodicProcessing we will do 
     // a full update on all cluster members.
     clientWasAdded = true;
-    debugSnapshot("update");
+    QPID_LOG(debug, "cluster update " << debugSnapshot());
 }
 
 void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
@@ -670,7 +652,7 @@ void ManagementAgent::periodicProcessing
         sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
     }
-    debugSnapshot("periodic");
+    QPID_LOG(debug, "periodic update " << debugSnapshot());
 }
 
 void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
@@ -1271,7 +1253,7 @@ void ManagementAgent::handleAttachReques
     agent->mgmtObject->set_systemId     ((const unsigned char*)systemId.data());
     agent->mgmtObject->set_brokerBank   (brokerBank);
     agent->mgmtObject->set_agentBank    (assignedBank);
-    addObject (agent->mgmtObject, 0, true);
+    addObject (agent->mgmtObject, 0);
     remoteAgents[connectionRef] = agent;
 
     QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
@@ -1893,13 +1875,13 @@ size_t ManagementAgent::validateEventSch
 
 void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a)
 {
-    Mutex::ScopedLock lock (addLock);
+    Mutex::ScopedLock lock (userLock);
     allocator = a;
 }
 
 uint64_t ManagementAgent::allocateId(Manageable* object)
 {
-    Mutex::ScopedLock lock (addLock);
+    Mutex::ScopedLock lock (userLock);
     if (allocator.get()) return allocator->getIdFor(object);
     return 0;
 }
@@ -2031,7 +2013,7 @@ void ManagementAgent::importSchemas(qpid
 
 void ManagementAgent::RemoteAgent::mapEncode(qpid::messaging::Variant::Map& map_) const {
     ::qpid::messaging::VariantMap _objId, _values;
-    
+
     map_["_brokerBank"] = brokerBank;
     map_["_agentBank"] = agentBank;
     map_["_routingKey"] = routingKey;
@@ -2068,10 +2050,10 @@ void ManagementAgent::RemoteAgent::mapDe
         mgmtObject->mapDecodeValues(i->second.asMap());
     }
 
-    agent.addObject(mgmtObject, 0, true);
+    // TODO aconway 2010-03-04: see comment in encode(), readProperties doesn't set v2key.
+    mgmtObject->set_connectionRef(connectionRef);
 }
 
-
 void ManagementAgent::exportAgents(std::string& out) {
     ::qpid::messaging::Message m;
     ::qpid::messaging::ListContent content(m);
@@ -2082,15 +2064,12 @@ void ManagementAgent::exportAgents(std::
          i != remoteAgents.end();
          ++i)
     {
-        ObjectId id = i->first;
+        // TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode
         RemoteAgent* agent = i->second;
 
         map_.clear();
-        omap.clear();
         amap.clear();
 
-        id.mapEncode(omap);
-        map_["_object_id"] = omap;
         agent->mapEncode(amap);
         map_["_remote_agent"] = amap;
         list_.push_back(map_);
@@ -2123,16 +2102,16 @@ void ManagementAgent::importAgents(qpid:
     }
 }
 
-void ManagementAgent::debugSnapshot(const char* type) {
+std::string ManagementAgent::debugSnapshot() {
     std::ostringstream msg;
-    msg << type << " snapshot, agents:";
+    msg << " management snapshot:";
     for (RemoteAgentMap::const_iterator i=remoteAgents.begin();
          i != remoteAgents.end(); ++i)
         msg << " " << i->second->routingKey;
     msg << " packages: " << packages.size();
     msg << " objects: " << managementObjects.size();
     msg << " new objects: " << newManagementObjects.size();
-    QPID_LOG(trace, msg.str());
+    return msg.str();
 }
 
 qpid::messaging::Variant::Map ManagementAgent::toMap(const FieldTable& from)

Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:919043-926606

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h Tue Mar 23 18:00:49 2010
@@ -75,9 +75,12 @@ public:
     /** Called by cluster to suppress management output during update. */
     void suppress(bool s) { suppressed = s; }
 
-    void setInterval     (uint16_t _interval) { interval = _interval; }
-    void setExchange     (qpid::broker::Exchange::shared_ptr mgmtExchange,
-                          qpid::broker::Exchange::shared_ptr directExchange);
+    void setInterval(uint16_t _interval) { interval = _interval; }
+    void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
+                     qpid::broker::Exchange::shared_ptr directExchange);
+    void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange,
+                       qpid::broker::Exchange::shared_ptr directExchange);
+
     int  getMaxThreads   () { return threadPoolSize; }
     QPID_BROKER_EXTERN void registerClass   (const std::string& packageName,
                                              const std::string& className,
@@ -88,12 +91,10 @@ public:
                                              uint8_t*    md5Sum,
                                              ManagementObject::writeSchemaCall_t schemaCall);
     QPID_BROKER_EXTERN ObjectId addObject   (ManagementObject* object,
-                                             uint64_t          persistId = 0,
-                                             bool              publishNow = false);
+                                             uint64_t          persistId = 0);
     QPID_BROKER_EXTERN ObjectId addObject   (ManagementObject*  object,
                                              const std::string& key,
-                                             bool               persistent = true,
-                                             bool               publishNow = false);
+                                             bool               persistent = true);
     QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
                                        severity_t severity = SEV_DEFAULT);
     QPID_BROKER_EXTERN void clientAdded     (const std::string& routingKey);
@@ -238,10 +239,18 @@ private:
     ManagementObjectVector       newDeletedManagementObjects;
 
     framing::Uuid                uuid;
-    sys::Mutex                   addLock;
-    sys::Mutex                   userLock;
+
+    //
+    // Lock hierarchy:  If a thread needs to take both addLock and userLock,
+    // it MUST take userLock first, then addLock.
+    //
+    sys::Mutex userLock;
+    sys::Mutex addLock;
+
     qpid::broker::Exchange::shared_ptr mExchange;
     qpid::broker::Exchange::shared_ptr dExchange;
+    qpid::broker::Exchange::shared_ptr v2Topic;
+    qpid::broker::Exchange::shared_ptr v2Direct;
     std::string                  dataDir;
     uint16_t                     interval;
     qpid::broker::Broker*        broker;
@@ -320,7 +329,7 @@ private:
     size_t validateSchema(framing::Buffer&, uint8_t kind);
     size_t validateTableSchema(framing::Buffer&);
     size_t validateEventSchema(framing::Buffer&);
-    void debugSnapshot(const char*);
+    std::string debugSnapshot();
 };
 
 }}

Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:919043-926606

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/AddressParser.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/AddressParser.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/AddressParser.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/AddressParser.cpp Tue Mar 23 18:00:49 2010
@@ -198,6 +198,7 @@ bool AddressParser::readSimpleValue(Vari
     std::string s;
     if (readWord(s)) {
         value = s;
+        try { value = value.asInt32(); return true; } catch (const InvalidConversion&) {}
         try { value = value.asInt64(); return true; } catch (const InvalidConversion&) {}
         try { value = value.asDouble(); return true; } catch (const InvalidConversion&) {}
         return true;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Connection.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Connection.cpp Tue Mar 23 18:00:49 2010
@@ -23,23 +23,17 @@
 #include "qpid/messaging/ConnectionImpl.h"
 #include "qpid/messaging/Session.h"
 #include "qpid/messaging/SessionImpl.h"
-#include "qpid/client/PrivateImplRef.h"
+#include "qpid/messaging/PrivateImplRef.h"
 #include "qpid/client/amqp0_10/ConnectionImpl.h"
 #include "qpid/log/Statement.h"
 
 namespace qpid {
-namespace client {
-
-typedef PrivateImplRef<qpid::messaging::Connection> PI;
-
-}
-
 namespace messaging {
 
-using qpid::client::PI;
+typedef PrivateImplRef<qpid::messaging::Connection> PI;
 
 Connection::Connection(ConnectionImpl* impl) { PI::ctor(*this, impl); }
-Connection::Connection(const Connection& c) : qpid::client::Handle<ConnectionImpl>() { PI::copy(*this, c); }
+Connection::Connection(const Connection& c) : Handle<ConnectionImpl>() { PI::copy(*this, c); }
 Connection& Connection::operator=(const Connection& c) { return PI::assign(*this, c); }
 Connection::~Connection() { PI::dtor(*this); }
 
@@ -67,40 +61,11 @@ Session Connection::newSession(bool tran
     return impl->newSession(transactional, name);
 }
 Session Connection::getSession(const std::string& name) const { return impl->getSession(name); }
-
-InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {}
-
-void parseKeyValuePair(const std::string& in, Variant::Map& out)
-{
-    std::string::size_type i = in.find('=');
-    if (i == std::string::npos || i == in.size() || in.find('=', i+1) != std::string::npos) {
-        throw InvalidOptionString(QPID_MSG("Cannot parse name-value pair from " << in));
-    } else {
-        out[in.substr(0, i)] = in.substr(i+1);
-    }
-}
-
-void parseOptionString(const std::string& in, Variant::Map& out)
-{
-    std::string::size_type start = 0;
-    std::string::size_type i = in.find('&');
-    while (i != std::string::npos) {
-        parseKeyValuePair(in.substr(start, i-start), out);
-        if (i < in.size()) {
-            start = i+1;
-            i = in.find('&', start);
-        } else {
-            i = std::string::npos;
-        }
-    }
-    parseKeyValuePair(in.substr(start), out);
+void Connection::setOption(const std::string& name, const Variant& value)
+{ 
+    impl->setOption(name, value);
 }
 
-Variant::Map parseOptionString(const std::string& in)
-{
-    Variant::Map map;    
-    parseOptionString(in, map);
-    return map;
-}
+InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {}
 
 }} // namespace qpid::messaging

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h Tue Mar 23 18:00:49 2010
@@ -25,12 +25,10 @@
 #include "qpid/RefCounted.h"
 
 namespace qpid {
-namespace client {
-}
-
 namespace messaging {
 
 class Session;
+class Variant;
 
 class ConnectionImpl : public virtual qpid::RefCounted
 {
@@ -40,6 +38,7 @@ class ConnectionImpl : public virtual qp
     virtual void close() = 0;
     virtual Session newSession(bool transactional, const std::string& name) = 0;
     virtual Session getSession(const std::string& name) const = 0;
+    virtual void setOption(const std::string& name, const Variant& value) = 0;
   private:
 };
 }} // namespace qpid::messaging

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Receiver.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Receiver.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Receiver.cpp Tue Mar 23 18:00:49 2010
@@ -22,21 +22,15 @@
 #include "qpid/messaging/Message.h"
 #include "qpid/messaging/ReceiverImpl.h"
 #include "qpid/messaging/Session.h"
-#include "qpid/client/PrivateImplRef.h"
+#include "qpid/messaging/PrivateImplRef.h"
 
 namespace qpid {
-namespace client {
-
-typedef PrivateImplRef<qpid::messaging::Receiver> PI;
-
-}
-
 namespace messaging {
 
-using qpid::client::PI;
+typedef PrivateImplRef<qpid::messaging::Receiver> PI;
 
 Receiver::Receiver(ReceiverImpl* impl) { PI::ctor(*this, impl); }
-Receiver::Receiver(const Receiver& s) : qpid::client::Handle<ReceiverImpl>() { PI::copy(*this, s); }
+Receiver::Receiver(const Receiver& s) : Handle<ReceiverImpl>() { PI::copy(*this, s); }
 Receiver::~Receiver() { PI::dtor(*this); }
 Receiver& Receiver::operator=(const Receiver& s) { return PI::assign(*this, s); }
 bool Receiver::get(Message& message, Duration timeout) { return impl->get(message, timeout); }

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h Tue Mar 23 18:00:49 2010
@@ -24,9 +24,6 @@
 #include "qpid/RefCounted.h"
 
 namespace qpid {
-namespace client {
-}
-
 namespace messaging {
 
 class Message;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Sender.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Sender.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Sender.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Sender.cpp Tue Mar 23 18:00:49 2010
@@ -22,21 +22,14 @@
 #include "qpid/messaging/Message.h"
 #include "qpid/messaging/SenderImpl.h"
 #include "qpid/messaging/Session.h"
-#include "qpid/client/PrivateImplRef.h"
+#include "qpid/messaging/PrivateImplRef.h"
 
 namespace qpid {
-namespace client {
-
-typedef PrivateImplRef<qpid::messaging::Sender> PI;
-
-}
-
 namespace messaging {
-
-using qpid::client::PI;
+typedef PrivateImplRef<qpid::messaging::Sender> PI;
 
 Sender::Sender(SenderImpl* impl) { PI::ctor(*this, impl); }
-Sender::Sender(const Sender& s) : qpid::client::Handle<SenderImpl>() { PI::copy(*this, s); }
+Sender::Sender(const Sender& s) : qpid::messaging::Handle<SenderImpl>() { PI::copy(*this, s); }
 Sender::~Sender() { PI::dtor(*this); }
 Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); }
 void Sender::send(const Message& message) { impl->send(message); }

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SenderImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SenderImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SenderImpl.h Tue Mar 23 18:00:49 2010
@@ -24,9 +24,6 @@
 #include "qpid/RefCounted.h"
 
 namespace qpid {
-namespace client {
-}
-
 namespace messaging {
 
 class Message;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Session.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Session.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Session.cpp Tue Mar 23 18:00:49 2010
@@ -25,21 +25,15 @@
 #include "qpid/messaging/Sender.h"
 #include "qpid/messaging/Receiver.h"
 #include "qpid/messaging/SessionImpl.h"
-#include "qpid/client/PrivateImplRef.h"
+#include "qpid/messaging/PrivateImplRef.h"
 
 namespace qpid {
-namespace client {
-
-typedef PrivateImplRef<qpid::messaging::Session> PI;
-
-}
-
 namespace messaging {
 
-using qpid::client::PI;
+typedef PrivateImplRef<qpid::messaging::Session> PI;
 
 Session::Session(SessionImpl* impl) { PI::ctor(*this, impl); }
-Session::Session(const Session& s) : qpid::client::Handle<SessionImpl>() { PI::copy(*this, s); }
+Session::Session(const Session& s) : Handle<SessionImpl>() { PI::copy(*this, s); }
 Session::~Session() { PI::dtor(*this); }
 Session& Session::operator=(const Session& s) { return PI::assign(*this, s); }
 void Session::commit() { impl->commit(); }

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SessionImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SessionImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/SessionImpl.h Tue Mar 23 18:00:49 2010
@@ -26,9 +26,6 @@
 #include "qpid/messaging/Duration.h"
 
 namespace qpid {
-namespace client {
-}
-
 namespace messaging {
 
 class Address;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Variant.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Variant.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Variant.cpp Tue Mar 23 18:00:49 2010
@@ -20,6 +20,7 @@
  */
 #include "qpid/messaging/Variant.h"
 #include "qpid/Msg.h"
+#include "qpid/log/Statement.h"
 #include <boost/format.hpp>
 #include <boost/lexical_cast.hpp>
 #include <algorithm>
@@ -50,7 +51,7 @@ class VariantImpl
     VariantImpl(int64_t);
     VariantImpl(float);
     VariantImpl(double);
-    VariantImpl(const std::string&);
+    VariantImpl(const std::string&, const std::string& encoding=std::string());
     VariantImpl(const Variant::Map&);
     VariantImpl(const Variant::List&);
     VariantImpl(const Uuid&);
@@ -130,7 +131,8 @@ VariantImpl::VariantImpl(int32_t i) : ty
 VariantImpl::VariantImpl(int64_t i) : type(VAR_INT64) { value.i64 = i; }
 VariantImpl::VariantImpl(float f) : type(VAR_FLOAT) { value.f = f; }
 VariantImpl::VariantImpl(double d) : type(VAR_DOUBLE) { value.d = d; }
-VariantImpl::VariantImpl(const std::string& s) : type(VAR_STRING) { value.v = new std::string(s); }
+VariantImpl::VariantImpl(const std::string& s, const std::string& e)
+    : type(VAR_STRING), encoding(e) { value.v = new std::string(s); }
 VariantImpl::VariantImpl(const Variant::Map& m) : type(VAR_MAP) { value.v = new Variant::Map(m); }
 VariantImpl::VariantImpl(const Variant::List& l) : type(VAR_LIST) { value.v = new Variant::List(l); }
 VariantImpl::VariantImpl(const Uuid& u) : type(VAR_UUID) { value.v = new Uuid(u); }
@@ -448,7 +450,7 @@ VariantImpl* VariantImpl::create(const V
       case VAR_INT64: return new VariantImpl(v.asInt64());
       case VAR_FLOAT: return new VariantImpl(v.asFloat());
       case VAR_DOUBLE: return new VariantImpl(v.asDouble());
-      case VAR_STRING: return new VariantImpl(v.asString());
+      case VAR_STRING: return new VariantImpl(v.asString(), v.getEncoding());
       case VAR_MAP: return new VariantImpl(v.asMap());
       case VAR_LIST: return new VariantImpl(v.asList());
       case VAR_UUID: return new VariantImpl(v.asUuid());

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Tue Mar 23 18:00:49 2010
@@ -22,6 +22,7 @@
 #include "qpid/sys/AsynchIOHandler.h"
 #include "qpid/sys/AsynchIO.h"
 #include "qpid/sys/Socket.h"
+#include "qpid/sys/SecuritySettings.h"
 #include "qpid/framing/AMQP_HighestVersion.h"
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/log/Statement.h"
@@ -144,7 +145,7 @@ void AsynchIOHandler::readbuff(AsynchIO&
             decoded = in.getPosition();
             QPID_LOG(debug, "RECV [" << identifier << "] INIT(" << protocolInit << ")");
             try {
-                codec = factory->create(protocolInit.getVersion(), *this, identifier, 0);
+                codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
                 if (!codec) {
                     //TODO: may still want to revise this...
                     //send valid version header & close connection.
@@ -200,7 +201,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&)
 
 void AsynchIOHandler::idle(AsynchIO&){
     if (isClient && codec == 0) {
-        codec = factory->create(*this, identifier, 0);
+        codec = factory->create(*this, identifier, SecuritySettings());
         write(framing::ProtocolInitiation(codec->getVersion()));
         return;
     }

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ConnectionCodec.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ConnectionCodec.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ConnectionCodec.h Tue Mar 23 18:00:49 2010
@@ -30,6 +30,7 @@ namespace sys {
 
 class InputHandlerFactory;
 class OutputControl;
+struct SecuritySettings;
 
 /**
  * Interface of coder/decoder for a connection of a specific protocol
@@ -49,27 +50,15 @@ class ConnectionCodec : public Codec {
     struct Factory {
         virtual ~Factory() {}
 
-        /** Security Strength Factor - indicates the level of security provided
-         * by the underlying transport.  If zero, the transport provides no
-         * security (e.g. TCP). If non-zero, the transport provides some level
-         * of security (e.g. SSL).  The values for SSF can be interpreted as:
-         *
-         * 0 = No protection.
-         * 1 = Integrity checking only.
-         * >1 = Supports authentication, integrity and confidentiality.
-         *      The number represents the encryption key length.
-         */
-
         /** Return 0 if version unknown */
         virtual ConnectionCodec* create(
             framing::ProtocolVersion, OutputControl&, const std::string& id,
-            unsigned int conn_ssf
+            const SecuritySettings&
         ) = 0;
 
         /** Return "preferred" codec for outbound connections. */
         virtual ConnectionCodec* create(
-            OutputControl&, const std::string& id,
-            unsigned int conn_ssf
+            OutputControl&, const std::string& id, const SecuritySettings&
         ) = 0;
     };
 };

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Tue Mar 23 18:00:49 2010
@@ -27,6 +27,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/sys/rdma/RdmaIO.h"
 #include "qpid/sys/OutputControl.h"
+#include "qpid/sys/SecuritySettings.h"
 
 #include <boost/bind.hpp>
 #include <boost/lexical_cast.hpp>
@@ -139,7 +140,7 @@ void RdmaIOHandler::initProtocolOut() {
     // but we must be able to send
     assert( codec == 0 );
     assert( aio->writable() && aio->bufferAvailable() );
-    codec = factory->create(*this, identifier, 0);
+    codec = factory->create(*this, identifier, SecuritySettings());
     write(framing::ProtocolInitiation(codec->getVersion()));
 }
 
@@ -186,7 +187,7 @@ void RdmaIOHandler::initProtocolIn(Rdma:
         decoded = in.getPosition();
         QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
 
-        codec = factory->create(protocolInit.getVersion(), *this, identifier, 0);
+        codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
 
         // If we failed to create the codec then we don't understand the offered protocol version
         if (!codec) {

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/SslPlugin.cpp Tue Mar 23 18:00:49 2010
@@ -41,14 +41,18 @@ struct SslServerOptions : ssl::SslOption
 {
     uint16_t port;
     bool clientAuth;
+    bool nodict;
 
     SslServerOptions() : port(5671),
-                         clientAuth(false)
+                         clientAuth(false),
+                         nodict(false)
     {
         addOptions()
             ("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
             ("ssl-require-client-authentication", optValue(clientAuth), 
-             "Forces clients to authenticate in order to establish an SSL connection");
+             "Forces clients to authenticate in order to establish an SSL connection")
+            ("ssl-sasl-no-dict", optValue(nodict), 
+             "Disables SASL mechanisms that are vulnerable to passive dictionary-based password attacks");
     }
 };
 
@@ -57,6 +61,7 @@ class SslProtocolFactory : public Protoc
     qpid::sys::ssl::SslSocket listener;
     const uint16_t listeningPort;
     std::auto_ptr<qpid::sys::ssl::SslAcceptor> acceptor;
+    bool nodict;
 
   public:
     SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
@@ -97,7 +102,8 @@ static struct SslPlugin : public Plugin 
                     
                     const broker::Broker::Options& opts = broker->getOptions();
                     ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options,
-                                                                                opts.connectionBacklog, opts.tcpNoDelay));
+                                                                                opts.connectionBacklog,
+                                                                                opts.tcpNoDelay));
                     QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
                     broker->registerProtocolFactory("ssl", protocol);
                 } catch (const std::exception& e) {
@@ -109,12 +115,13 @@ static struct SslPlugin : public Plugin 
 } sslPlugin;
 
 SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, int backlog, bool nodelay) :
-    tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth))
+    tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)),
+    nodict(options.nodict)
 {}
 
 void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys::ssl::SslSocket& s,
                                           ConnectionCodec::Factory* f, bool isClient) {
-    qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getPeerAddress(), f);
+    qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getPeerAddress(), f, nodict);
 
     if (tcpNoDelay) {
         s.setTcpNoDelay(tcpNoDelay);

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp Tue Mar 23 18:00:49 2010
@@ -42,13 +42,14 @@ struct Buff : public SslIO::BufferBase {
     { delete [] bytes;}
 };
 
-SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f) :
+SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict) :
     identifier(id),
     aio(0),
     factory(f),
     codec(0),
     readError(false),
-    isClient(false)
+    isClient(false),
+    nodict(_nodict)
 {}
 
 SslHandler::~SslHandler() {
@@ -111,7 +112,7 @@ void SslHandler::readbuff(SslIO& , SslIO
             decoded = in.getPosition();
             QPID_LOG(debug, "RECV [" << identifier << "] INIT(" << protocolInit << ")");
             try {
-                codec = factory->create(protocolInit.getVersion(), *this, identifier, aio->getKeyLen());
+                codec = factory->create(protocolInit.getVersion(), *this, identifier, getSecuritySettings(aio));
                 if (!codec) {
                     //TODO: may still want to revise this...
                     //send valid version header & close connection.
@@ -166,7 +167,7 @@ void SslHandler::nobuffs(SslIO&) {
 
 void SslHandler::idle(SslIO&){
     if (isClient && codec == 0) {
-        codec = factory->create(*this, identifier, aio->getKeyLen());
+        codec = factory->create(*this, identifier, getSecuritySettings(aio));
         write(framing::ProtocolInitiation(codec->getVersion()));
         return;
     }
@@ -183,5 +184,12 @@ void SslHandler::idle(SslIO&){
         aio->queueWriteClose();
 }
 
+SecuritySettings SslHandler::getSecuritySettings(SslIO* aio)
+{
+    SecuritySettings settings = aio->getSecuritySettings();
+    settings.nodict = nodict;
+    return settings;
+}
+
 
 }}} // namespace qpid::sys::ssl

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslHandler.h Tue Mar 23 18:00:49 2010
@@ -45,11 +45,13 @@ class SslHandler : public OutputControl 
     ConnectionCodec* codec;
     bool readError;
     bool isClient;
+    bool nodict;
 
     void write(const framing::ProtocolInitiation&);
+    qpid::sys::SecuritySettings getSecuritySettings(SslIO* aio);
 
   public:
-    SslHandler(std::string id, ConnectionCodec::Factory* f);
+    SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict);
     ~SslHandler();
     void init(SslIO* a, int numBuffs);
 

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp Tue Mar 23 18:00:49 2010
@@ -436,4 +436,9 @@ void SslIO::close(DispatchHandle& h) {
     }
 }
 
-int SslIO::getKeyLen() {return socket.getKeyLen();}
+SecuritySettings SslIO::getSecuritySettings() {
+    SecuritySettings settings;
+    settings.ssf = socket.getKeyLen();
+    settings.authid = socket.getClientAuthId();
+    return settings;
+}

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslIo.h Tue Mar 23 18:00:49 2010
@@ -22,6 +22,7 @@
  */
 
 #include "qpid/sys/DispatchHandle.h"
+#include "qpid/sys/SecuritySettings.h"
 
 #include <boost/function.hpp>
 #include <deque>
@@ -156,7 +157,7 @@ public:
     bool writeQueueEmpty() { return writeQueue.empty(); }
     BufferBase* getQueuedBuffer();
 
-    int getKeyLen();
+    qpid::sys::SecuritySettings getSecuritySettings();
 
 private:
     ~SslIO();

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp Tue Mar 23 18:00:49 2010
@@ -102,6 +102,34 @@ std::string getService(int fd, bool loca
     return servName;
 }
 
+const std::string DOMAIN_SEPARATOR("@");
+const std::string DC_SEPARATOR(".");
+const std::string DC("DC");
+const std::string DN_DELIMS(" ,=");
+
+std::string getDomainFromSubject(std::string subject)
+{
+    std::string::size_type last = subject.find_first_not_of(DN_DELIMS, 0);
+    std::string::size_type i = subject.find_first_of(DN_DELIMS, last);
+
+    std::string domain;
+    bool nextTokenIsDC = false;
+    while (std::string::npos != i || std::string::npos != last)
+    {
+        std::string token = subject.substr(last, i - last);
+        if (nextTokenIsDC) {
+            if (domain.size()) domain += DC_SEPARATOR;
+            domain += token;
+            nextTokenIsDC = false;
+        } else if (token == DC) {
+            nextTokenIsDC = true;
+        }
+        last = subject.find_first_not_of(DN_DELIMS, i);
+        i = subject.find_first_of(DN_DELIMS, last);
+    }
+    return domain;
+}
+
 }
 
 SslSocket::SslSocket() : IOHandle(new IOHandlePrivate()), socket(0), prototype(0) 
@@ -294,4 +322,25 @@ int SslSocket::getKeyLen() const
     return 0;
 }
 
+std::string SslSocket::getClientAuthId() const
+{
+    std::string authId;
+    CERTCertificate* cert = SSL_PeerCertificate(socket);
+    if (cert) {
+        authId = CERT_GetCommonName(&(cert->subject));
+        /*
+         * The NSS function CERT_GetDomainComponentName only returns
+         * the last component of the domain name, so we have to parse
+         * the subject manually to extract the full domain.
+         */
+        std::string domain = getDomainFromSubject(cert->subjectName);
+        if (!domain.empty()) {
+            authId += DOMAIN_SEPARATOR;
+            authId += domain;
+        }
+        CERT_DestroyCertificate(cert);
+    }
+    return authId;
+}
+
 }}} // namespace qpid::sys::ssl

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h Tue Mar 23 18:00:49 2010
@@ -101,6 +101,7 @@ public:
     int getError() const;
 
     int getKeyLen() const;
+    std::string getClientAuthId() const;
 
 private:
     mutable std::string connectname;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/xml/XmlExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/xml/XmlExchange.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/xml/XmlExchange.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/xml/XmlExchange.cpp Tue Mar 23 18:00:49 2010
@@ -34,6 +34,10 @@
 
 #include <xercesc/framework/MemBufInputSource.hpp>
 
+#ifdef XQ_EFFECTIVE_BOOLEAN_VALUE_HPP
+#include <xqilla/ast/XQEffectiveBooleanValue.hpp>
+#endif
+
 #include <xqilla/ast/XQGlobalVariable.hpp>
 
 #include <xqilla/context/ItemFactory.hpp>
@@ -51,7 +55,7 @@ namespace qpid {
 namespace broker {
 
 
-    XmlExchange::XmlExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b)
+XmlExchange::XmlExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b)
 {
     if (mgmtExchange != 0)
         mgmtExchange->set_type (typeName);
@@ -180,7 +184,13 @@ bool XmlExchange::matches(Query& query, 
       }
 
       Result result = query->execute(context.get());
+#ifdef XQ_EFFECTIVE_BOOLEAN_VALUE_HPP
+      Item::Ptr first_ = result->next(context.get());
+      Item::Ptr second_ = result->next(context.get());
+      return XQEffectiveBooleanValue::get(first_, second_, context.get(), 0);
+#else 
       return result->getEffectiveBooleanValue(context.get(), 0);
+#endif
   }
   catch (XQException& e) {
       QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent);

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/ClusterFixture.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/ClusterFixture.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/ClusterFixture.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/ClusterFixture.cpp Tue Mar 23 18:00:49 2010
@@ -130,11 +130,11 @@ void ClusterFixture::kill(size_t n, int 
         forkedBrokers[n]->kill(sig);
 }
 
-/** Kill a broker and suppressing errors from closing connection c. */
+/** Kill a broker and suppress errors from closing connection c. */
 void ClusterFixture::killWithSilencer(size_t n, client::Connection& c, int sig) {
     ScopedSuppressLogging sl;
-    kill(n,sig);
     try { c.close(); } catch(...) {}
+    kill(n,sig);
 }
 
 /**

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/InitialStatusMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/InitialStatusMap.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/InitialStatusMap.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/InitialStatusMap.cpp Tue Mar 23 18:00:49 2010
@@ -37,19 +37,19 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTes
 typedef InitialStatusMap::Status Status;
 
 Status activeStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) {
-    return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid(), 0,
+    return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid(),
                   encodeMemberSet(ms));
 }
 
 Status newcomerStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) {
-    return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid(), 0,
+    return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid(),
                   encodeMemberSet(ms));
 }
 
 Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid(),
                    const MemberSet& ms=MemberSet())
 {
-    return Status(ProtocolVersion(), 0, active, start, state, stop, 0, 
+    return Status(ProtocolVersion(), 0, active, start, state, stop, 
                   encodeMemberSet(ms));
 }
 
@@ -173,20 +173,6 @@ QPID_AUTO_TEST_CASE(testInteveningConfig
     BOOST_CHECK_EQUAL(map.getClusterId(), id);
 }
 
-QPID_AUTO_TEST_CASE(testInitialSize) {
-    InitialStatusMap map(MemberId(0), 3);
-    map.configChange(list_of<MemberId>(0)(1));
-    map.received(MemberId(0), newcomerStatus());
-    map.received(MemberId(1), newcomerStatus());
-    BOOST_CHECK(!map.isComplete());
-
-    map.configChange(list_of<MemberId>(0)(1)(2));
-    map.received(MemberId(0), newcomerStatus());
-    map.received(MemberId(1), newcomerStatus());
-    map.received(MemberId(2), newcomerStatus());
-    BOOST_CHECK(map.isComplete());
-}
-
 QPID_AUTO_TEST_CASE(testAllCleanNoUpdate) {
     InitialStatusMap map(MemberId(0), 3);
     map.configChange(list_of<MemberId>(0)(1)(2));
@@ -244,8 +230,6 @@ QPID_AUTO_TEST_CASE(testEmptyAlone) {
     BOOST_CHECK(!map.isUpdateNeeded());
 }
 
-// FIXME aconway 2009-11-20: consistency tests for mixed stores,
-
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Makefile.am?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Makefile.am Tue Mar 23 18:00:49 2010
@@ -17,8 +17,6 @@
 # under the License.
 #
 
-SUBDIRS = . testagent
-
 AM_CXXFLAGS = $(WARNING_CFLAGS) -DBOOST_TEST_DYN_LINK
 INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src
 PUBLIC_INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include # Use public API only
@@ -381,3 +379,5 @@ python_prep:
 		--prefix=$(PYTHON_BLD_DIR) --install-lib=$(PYTHON_BLD_DIR) \
 		--install-scripts=$(PYTHON_BLD_DIR)/commands; \
 	else echo "WARNING: python client not built, missing $(PYTHON_SRC_DIR)"; fi
+
+include testagent.mk

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/MessagingSessionTests.cpp Tue Mar 23 18:00:49 2010
@@ -336,6 +336,12 @@ QPID_AUTO_TEST_CASE(testMapMessage)
     MapContent content(out);
     content["abc"] = "def";
     content["pi"] = 3.14f;
+    Variant utf8("A utf 8 string");
+    utf8.setEncoding("utf8");
+    content["utf8"] = utf8;
+    Variant utf16("\x00\x61\x00\x62\x00\x63");
+    utf16.setEncoding("utf16");
+    content["utf16"] = utf16;
     content.encode();
     sender.send(out);
     Receiver receiver = fix.session.createReceiver(fix.queue);
@@ -343,6 +349,10 @@ QPID_AUTO_TEST_CASE(testMapMessage)
     MapView view(in);
     BOOST_CHECK_EQUAL(view["abc"].asString(), "def");
     BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f);
+    BOOST_CHECK_EQUAL(view["utf8"].asString(), utf8.asString());
+    BOOST_CHECK_EQUAL(view["utf8"].getEncoding(), utf8.getEncoding());
+    BOOST_CHECK_EQUAL(view["utf16"].asString(), utf16.asString());
+    BOOST_CHECK_EQUAL(view["utf16"].getEncoding(), utf16.getEncoding());
     fix.session.acknowledge();
 }
 

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Variant.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Variant.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/Variant.cpp Tue Mar 23 18:00:49 2010
@@ -178,6 +178,21 @@ QPID_AUTO_TEST_CASE(testIsEqualTo)
     BOOST_CHECK_EQUAL(a, b);
 }
 
+QPID_AUTO_TEST_CASE(testEncoding)
+{
+    Variant a("abc");
+    a.setEncoding("utf8");
+    Variant b = a;
+    Variant map = Variant::Map();
+    map.asMap()["a"] = a;
+    map.asMap()["b"] = b;
+    BOOST_CHECK_EQUAL(a.getEncoding(), std::string("utf8"));
+    BOOST_CHECK_EQUAL(a.getEncoding(), b.getEncoding());
+    BOOST_CHECK_EQUAL(a.getEncoding(), map.asMap()["a"].getEncoding());
+    BOOST_CHECK_EQUAL(b.getEncoding(), map.asMap()["b"].getEncoding());
+    BOOST_CHECK_EQUAL(map.asMap()["a"].getEncoding(), map.asMap()["b"].getEncoding());
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.fail
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.fail?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.fail (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.fail Tue Mar 23 18:00:49 2010
@@ -1,3 +1,3 @@
-cluster_tests.LongTests.test_management
+
 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org