You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/23 19:00:56 UTC

svn commit: r926686 [2/6] - in /qpid/branches/qmf-devel0.7a/qpid: ./ cpp/ cpp/docs/api/ cpp/docs/src/ cpp/examples/ cpp/examples/messaging/ cpp/examples/pub-sub/ cpp/include/qmf/engine/ cpp/include/qpid/agent/ cpp/include/qpid/client/amqp0_10/ cpp/incl...

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/CMakeLists.txt?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/CMakeLists.txt Tue Mar 23 18:00:49 2010
@@ -589,19 +589,6 @@ set_target_properties (qpidcommon PROPER
 install (TARGETS qpidcommon
          DESTINATION ${QPID_INSTALL_LIBDIR}
          COMPONENT ${QPID_COMPONENT_COMMON})
-# When the client programming component is installed, Windows also needs
-# the debug variant of qpidcommon and qpidclient, and the PDBs for those
-# as well. It would be nice to figure out a way to put some sanity checking
-# here... as it is, success relies on the packager building the debug then
-# the release, then packaging the release build.
-if (WIN32)
-    install (PROGRAMS
-             ${CMAKE_CURRENT_BINARY_DIR}/Debug/qpidcommond.dll
-             ${CMAKE_CURRENT_BINARY_DIR}/Debug/qpidcommond.lib
-             ${CMAKE_CURRENT_BINARY_DIR}/Debug/qpidcommond.pdb
-             DESTINATION ${QPID_INSTALL_LIBDIR}
-             COMPONENT ${QPID_COMPONENT_CLIENT})
-endif (WIN32)
 
 set (qpidclient_SOURCES
      ${rgen_client_srcs}
@@ -667,6 +654,7 @@ set (qpidclient_SOURCES
      qpid/client/amqp0_10/CodecsInternal.h
      qpid/client/amqp0_10/ConnectionImpl.h
      qpid/client/amqp0_10/ConnectionImpl.cpp
+     qpid/client/amqp0_10/FailoverUpdates.cpp
      qpid/client/amqp0_10/IncomingMessages.h
      qpid/client/amqp0_10/IncomingMessages.cpp
      qpid/client/amqp0_10/MessageSink.h
@@ -679,6 +667,8 @@ set (qpidclient_SOURCES
      qpid/client/amqp0_10/SessionImpl.cpp
      qpid/client/amqp0_10/SenderImpl.h
      qpid/client/amqp0_10/SenderImpl.cpp
+     qpid/client/amqp0_10/SimpleUrlParser.h
+     qpid/client/amqp0_10/SimpleUrlParser.cpp
 )
 
 add_library (qpidclient SHARED ${qpidclient_SOURCES})
@@ -699,14 +689,6 @@ if (NOT QPID_GENERATED_HEADERS_IN_SOURCE
            DESTINATION ${QPID_INSTALL_INCLUDEDIR}
            COMPONENT ${QPID_COMPONENT_CLIENT_INCLUDE})
 endif (NOT QPID_GENERATED_HEADERS_IN_SOURCE)
-if (WIN32)
-    install (PROGRAMS
-             ${CMAKE_CURRENT_BINARY_DIR}/Debug/qpidclientd.dll
-             ${CMAKE_CURRENT_BINARY_DIR}/Debug/qpidclientd.lib
-             ${CMAKE_CURRENT_BINARY_DIR}/Debug/qpidclientd.pdb
-             DESTINATION ${QPID_INSTALL_LIBDIR}
-             COMPONENT ${QPID_COMPONENT_CLIENT})
-endif (WIN32)
 
 if (WIN32)
     set(AMQP_WCF_DIR ${qpid-cpp_SOURCE_DIR}/../wcf)
@@ -784,7 +766,8 @@ set (qpidbroker_SOURCES
      qpid/broker/TxPublish.cpp
      qpid/broker/Vhost.cpp
      qpid/management/ManagementAgent.cpp
-     qpid/management/ManagementExchange.cpp
+     qpid/management/ManagementDirectExchange.cpp
+     qpid/management/ManagementTopicExchange.cpp
      qpid/sys/TCPIOPlugin.cpp
 )
 add_library (qpidbroker SHARED ${qpidbroker_SOURCES})
@@ -828,7 +811,7 @@ set (qmf_SOURCES
     qpid/agent/ManagementAgentImpl.h
     )
 add_library (qmf SHARED ${qmf_SOURCES})
-target_link_libraries (qmf qmfengine)
+target_link_libraries (qmf qpidclient)
 set_target_properties (qmf PROPERTIES
                        VERSION ${qmf_version})
 install (TARGETS qmf OPTIONAL
@@ -922,15 +905,6 @@ set_target_properties (qmfconsole PROPER
 install (TARGETS qmfconsole
          DESTINATION ${QPID_INSTALL_LIBDIR}
          COMPONENT ${QPID_COMPONENT_QMF})
-# On Windows, also grab the debug version of qmfconsole.
-if (WIN32)
-    install (PROGRAMS
-             ${CMAKE_CURRENT_BINARY_DIR}/Debug/qmfconsoled.dll
-             ${CMAKE_CURRENT_BINARY_DIR}/Debug/qmfconsoled.lib
-             ${CMAKE_CURRENT_BINARY_DIR}/Debug/qmfconsoled.pdb
-             DESTINATION ${QPID_INSTALL_LIBDIR}
-             COMPONENT ${QPID_COMPONENT_QMF})
-endif (WIN32)
 
 # A queue event listener plugin that creates messages on a replication
 # queue corresponding to enqueue and dequeue events:
@@ -976,6 +950,5 @@ add_definitions(-DHAVE_CONFIG_H)
 # Now create the config file from all the info learned above.
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake
                ${CMAKE_CURRENT_BINARY_DIR}/config.h)
-
 add_subdirectory(qpid/store)
 add_subdirectory(tests)

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/Makefile.am?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/Makefile.am Tue Mar 23 18:00:49 2010
@@ -465,6 +465,7 @@ libqpidcommon_la_SOURCES +=			\
   qpid/sys/Runnable.cpp				\
   qpid/sys/ScopedIncrement.h			\
   qpid/sys/SecurityLayer.h			\
+  qpid/sys/SecuritySettings.h			\
   qpid/sys/Semaphore.h				\
   qpid/sys/Shlib.cpp				\
   qpid/sys/Shlib.h				\
@@ -637,8 +638,10 @@ libqpidbroker_la_SOURCES = \
   qpid/management/IdAllocator.h \
   qpid/management/ManagementAgent.cpp \
   qpid/management/ManagementAgent.h \
-  qpid/management/ManagementExchange.cpp \
-  qpid/management/ManagementExchange.h \
+  qpid/management/ManagementDirectExchange.cpp \
+  qpid/management/ManagementDirectExchange.h \
+  qpid/management/ManagementTopicExchange.cpp \
+  qpid/management/ManagementTopicExchange.h \
   qpid/sys/TCPIOPlugin.cpp
 
 
@@ -711,6 +714,7 @@ libqpidclient_la_SOURCES =			\
   qpid/messaging/Message.cpp			\
   qpid/messaging/MessageImpl.h			\
   qpid/messaging/MessageImpl.cpp		\
+  qpid/messaging/PrivateImplRef.h		\
   qpid/messaging/Sender.cpp			\
   qpid/messaging/Receiver.cpp			\
   qpid/messaging/Session.cpp			\
@@ -728,6 +732,7 @@ libqpidclient_la_SOURCES =			\
   qpid/client/amqp0_10/CodecsInternal.h		\
   qpid/client/amqp0_10/ConnectionImpl.h	        \
   qpid/client/amqp0_10/ConnectionImpl.cpp	\
+  qpid/client/amqp0_10/FailoverUpdates.cpp	\
   qpid/client/amqp0_10/IncomingMessages.h	\
   qpid/client/amqp0_10/IncomingMessages.cpp	\
   qpid/client/amqp0_10/MessageSink.h		\
@@ -739,7 +744,9 @@ libqpidclient_la_SOURCES =			\
   qpid/client/amqp0_10/SessionImpl.h		\
   qpid/client/amqp0_10/SessionImpl.cpp		\
   qpid/client/amqp0_10/SenderImpl.h		\
-  qpid/client/amqp0_10/SenderImpl.cpp
+  qpid/client/amqp0_10/SenderImpl.cpp           \
+  qpid/client/amqp0_10/SimpleUrlParser.h	\
+  qpid/client/amqp0_10/SimpleUrlParser.cpp
 
 # NOTE: only public header files (which should be in ../include)
 # should go in this list. Private headers should go in the SOURCES
@@ -812,20 +819,23 @@ nobase_include_HEADERS +=			\
   ../include/qpid/sys/Thread.h			\
   ../include/qpid/sys/Time.h			\
   ../include/qpid/messaging/Address.h 		\
-  ../include/qpid/messaging/Connection.h 	\
   ../include/qpid/messaging/Codec.h 	        \
+  ../include/qpid/messaging/Connection.h 	\
   ../include/qpid/messaging/Duration.h 	        \
+  ../include/qpid/messaging/Handle.h		\
+  ../include/qpid/messaging/ImportExport.h	\
   ../include/qpid/messaging/ListContent.h 	\
   ../include/qpid/messaging/ListView.h 		\
   ../include/qpid/messaging/MapContent.h 	\
   ../include/qpid/messaging/MapView.h 		\
   ../include/qpid/messaging/Message.h 		\
-  ../include/qpid/messaging/Sender.h 		\
   ../include/qpid/messaging/Receiver.h 	        \
+  ../include/qpid/messaging/Sender.h 		\
   ../include/qpid/messaging/Session.h 		\
   ../include/qpid/messaging/Uuid.h 		\
   ../include/qpid/messaging/Variant.h 		\
-  ../include/qpid/client/amqp0_10/Codecs.h
+  ../include/qpid/client/amqp0_10/Codecs.h	\
+  ../include/qpid/client/amqp0_10/FailoverUpdates.h
 
 # Force build of qpidd during dist phase so help2man will work.
 dist-hook: $(BUILT_SOURCES)

Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qmf/engine/Agent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:919043-926606

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/Version.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/Version.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/Version.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/Version.h Tue Mar 23 18:00:49 2010
@@ -23,10 +23,11 @@
 
 #ifdef HAVE_CONFIG_H
 #  include "config.h"
+#else
+#  error "config.h not generated"
 #endif
 
 namespace qpid {
-#ifdef HAVE_CONFIG_H
   const std::string product  = PACKAGE_NAME;
   const std::string version  = PACKAGE_VERSION;
 #  if HAVE_SASL
@@ -34,11 +35,6 @@ namespace qpid {
 #  else
   const std::string saslName = "qpidd-no-sasl";
 #  endif
-#else
-  const std::string product  = "qpidc";
-  const std::string version  = "0.7";
-  const std::string saslName = "qpid-broker";
-#endif
 }
 
 #endif  /*!QPID_VERSION_H*/

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Broker.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Broker.cpp Tue Mar 23 18:00:49 2010
@@ -35,7 +35,8 @@
 #include "qmf/org/apache/qpid/broker/Package.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
-#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ManagementDirectExchange.h"
+#include "qpid/management/ManagementTopicExchange.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ProtocolInitiation.h"
@@ -234,11 +235,22 @@ Broker::Broker(const Broker::Options& co
     declareStandardExchange(amq_match, HeadersExchange::typeName);
 
     if(conf.enableMgmt) {
-        exchanges.declare(qpid_management, ManagementExchange::typeName);
-        Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
-        Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
+        exchanges.declare(qpid_management, ManagementTopicExchange::typeName);
+        Exchange::shared_ptr mExchange = exchanges.get(qpid_management);
+        Exchange::shared_ptr dExchange = exchanges.get(amq_direct);
         managementAgent->setExchange(mExchange, dExchange);
-        boost::dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent(managementAgent.get());
+        boost::dynamic_pointer_cast<ManagementTopicExchange>(mExchange)->setManagmentAgent(managementAgent.get(), 1);
+
+        std::string qmfTopic("qmf.default.topic");
+        std::string qmfDirect("qmf.default.direct");
+
+        std::pair<Exchange::shared_ptr, bool> topicPair(exchanges.declare(qmfTopic, ManagementTopicExchange::typeName));
+        std::pair<Exchange::shared_ptr, bool> directPair(exchanges.declare(qmfDirect, ManagementDirectExchange::typeName));
+
+        boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2);
+        boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2);
+
+        managementAgent->setExchangeV2(topicPair.first, directPair.first);
     }
     else
         QPID_LOG(info, "Management not enabled");

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.cpp Tue Mar 23 18:00:49 2010
@@ -23,6 +23,7 @@
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/sys/SecuritySettings.h"
 
 #include "qpid/log/Statement.h"
 #include "qpid/ptr_map.h"
@@ -72,9 +73,10 @@ struct ConnectionTimeoutTask : public sy
     }
 };
 
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_, uint64_t objectId, bool shadow_) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_,
+                       const qpid::sys::SecuritySettings& external, bool isLink_, uint64_t objectId, bool shadow_) :
     ConnectionState(out_, broker_),
-    ssf(ssf),
+    securitySettings(external),
     adapter(*this, isLink_),
     isLink(isLink_),
     mgmtClosing(false),
@@ -99,7 +101,7 @@ Connection::Connection(ConnectionOutputH
         if (agent != 0) {
             mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
             mgmtObject->set_shadow(shadow);
-            agent->addObject(mgmtObject, objectId, true);
+            agent->addObject(mgmtObject, objectId);
         }
         ConnectionState::setUrl(mgmtId);
     }

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.h Tue Mar 23 18:00:49 2010
@@ -45,6 +45,7 @@
 #include "qpid/sys/AggregateOutput.h"
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/SecuritySettings.h"
 #include "qpid/sys/Socket.h"
 #include "qpid/sys/TimeoutHandler.h"
 #include "qpid/sys/Mutex.h"
@@ -78,7 +79,8 @@ class Connection : public sys::Connectio
         virtual void connectionError(const std::string&) = 0;
     };
 
-    Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, unsigned int ssf,
+    Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId,
+               const qpid::sys::SecuritySettings&,
                bool isLink = false, uint64_t objectId = 0, bool shadow=false);
     ~Connection ();
 
@@ -136,14 +138,17 @@ class Connection : public sys::Connectio
     // Used by cluster to update connection status
     sys::AggregateOutput& getOutputTasks() { return outputTasks; }
 
-    unsigned int getSSF() { return ssf; }
+    const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
+    { 
+        return securitySettings;
+    }
 
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
 
     ChannelMap channels;
-    unsigned int ssf;
+    qpid::sys::SecuritySettings securitySettings;
     ConnectionHandler adapter;
     const bool isLink;
     bool mgmtClosing;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Tue Mar 23 18:00:49 2010
@@ -22,12 +22,14 @@
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/amqp_0_10/Connection.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/sys/SecuritySettings.h"
 #include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace broker {
 
 using framing::ProtocolVersion;
+using qpid::sys::SecuritySettings;
 typedef std::auto_ptr<amqp_0_10::Connection> ConnectionPtr;
 typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr;
 
@@ -37,7 +39,7 @@ ConnectionFactory::~ConnectionFactory() 
 
 sys::ConnectionCodec*
 ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
-                          unsigned int ) {
+                          const SecuritySettings& external) {
     if (broker.getConnectionCounter().allowConnection())
     {
         QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused");
@@ -45,7 +47,7 @@ ConnectionFactory::create(ProtocolVersio
     }
     if (v == ProtocolVersion(0, 10)) {
         ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
-        c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, false)));
+        c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, false)));
         return c.release();
     }
     return 0;
@@ -53,10 +55,10 @@ ConnectionFactory::create(ProtocolVersio
 
 sys::ConnectionCodec*
 ConnectionFactory::create(sys::OutputControl& out, const std::string& id,
-                          unsigned int) {
+                          const SecuritySettings& external) {
     // used to create connections from one broker to another
     ConnectionPtr c(new amqp_0_10::Connection(out, id, true));
-    c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, true)));
+    c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, true)));
     return c.release();
 }
 

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.h Tue Mar 23 18:00:49 2010
@@ -36,11 +36,10 @@ class ConnectionFactory : public sys::Co
 
     sys::ConnectionCodec*
     create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id,
-           unsigned int conn_ssf);
+           const qpid::sys::SecuritySettings&);
 
     sys::ConnectionCodec*
-    create(sys::OutputControl&, const std::string& id,
-           unsigned int conn_ssf);
+    create(sys::OutputControl&, const std::string& id, const qpid::sys::SecuritySettings&);
 
   private:
     Broker& broker;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Tue Mar 23 18:00:49 2010
@@ -24,7 +24,8 @@
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
 #include "qpid/broker/TopicExchange.h"
-#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ManagementDirectExchange.h"
+#include "qpid/management/ManagementTopicExchange.h"
 #include "qpid/framing/reply_exceptions.h"
 
 using namespace qpid::broker;
@@ -52,8 +53,10 @@ pair<Exchange::shared_ptr, bool> Exchang
             exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
         }else if (type == HeadersExchange::typeName) {
             exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
-        }else if (type == ManagementExchange::typeName) {
-            exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent, broker));
+        }else if (type == ManagementDirectExchange::typeName) {
+            exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
+        }else if (type == ManagementTopicExchange::typeName) {
+            exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
         }else{
             FunctionMap::iterator i =  factory.find(type);
             if (i == factory.end()) {

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Tue Mar 23 18:00:49 2010
@@ -252,7 +252,7 @@ namespace 
 {
 
     bool match_values(const FieldValue& bind, const FieldValue& msg) {
-        return  bind.empty() || bind == msg;
+        return  bind.getType() == 0xf0 || bind == msg;
     }
 
 }

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp Tue Mar 23 18:00:49 2010
@@ -30,7 +30,8 @@ IncompleteMessageList::IncompleteMessage
 
 IncompleteMessageList::~IncompleteMessageList() 
 {
-    sys::Mutex::ScopedLock l(lock);
+    //  No lock here. We are relying on Messsag::reset*CompleteCallback
+    //  to ensure no callbacks are in progress before they return.
     for (Messages::iterator i = incomplete.begin(); i != incomplete.end(); ++i) {
         (*i)->resetEnqueueCompleteCallback();
         (*i)->resetDequeueCompleteCallback();
@@ -78,7 +79,7 @@ void IncompleteMessageList::each(const C
         sys::Mutex::ScopedLock l(lock);
         snapshot = incomplete;
     }
-    std::for_each(incomplete.begin(), incomplete.end(), listen); // FIXME aconway 2008-11-07: passed by ref or value?
+    std::for_each(incomplete.begin(), incomplete.end(), listen);
 }
 
 }}

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.cpp Tue Mar 23 18:00:49 2010
@@ -49,7 +49,8 @@ TransferAdapter Message::TRANSFER;
 Message::Message(const framing::SequenceNumber& id) :
     frames(id), persistenceId(0), redelivered(false), loaded(false),
     staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), 
-    expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {}
+    expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0),
+    inCallback(false), requiredCredit(0) {}
 
 Message::~Message()
 {
@@ -398,35 +399,55 @@ void Message::setReplacementMessage(boos
     replacement[qfor] = msg;
 }
 
+namespace {
+struct ScopedSet {
+    sys::Monitor& lock;
+    bool& flag;
+    ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) {
+        sys::Monitor::ScopedLock sl(lock);
+        flag = true;
+    }
+    ~ScopedSet(){
+        sys::Monitor::ScopedLock sl(lock);
+        flag = false;
+        lock.notifyAll();
+    }
+};
+}
+
 void Message::allEnqueuesComplete() {
-    sys::Mutex::ScopedLock l(callbackLock);
+    ScopedSet ss(callbackLock, inCallback);
     MessageCallback* cb = enqueueCallback;
     if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
 }
 
 void Message::allDequeuesComplete() {
-    sys::Mutex::ScopedLock l(callbackLock);
+    ScopedSet ss(callbackLock, inCallback);
     MessageCallback* cb = dequeueCallback;
     if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
 }
 
 void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
     sys::Mutex::ScopedLock l(callbackLock);
+    while (inCallback) callbackLock.wait();
     enqueueCallback = &cb;
 }
 
 void Message::resetEnqueueCompleteCallback() {
     sys::Mutex::ScopedLock l(callbackLock);
+    while (inCallback) callbackLock.wait();
     enqueueCallback = 0;
 }
 
 void Message::setDequeueCompleteCallback(MessageCallback& cb) {
     sys::Mutex::ScopedLock l(callbackLock);
+    while (inCallback) callbackLock.wait();
     dequeueCallback = &cb;
 }
 
 void Message::resetDequeueCompleteCallback() {
     sys::Mutex::ScopedLock l(callbackLock);
+    while (inCallback) callbackLock.wait();
     dequeueCallback = 0;
 }
 

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.h Tue Mar 23 18:00:49 2010
@@ -26,7 +26,7 @@
 #include "qpid/broker/PersistableMessage.h"
 #include "qpid/broker/MessageAdapter.h"
 #include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
 #include <boost/function.hpp>
 #include <boost/shared_ptr.hpp>
@@ -189,9 +189,10 @@ public:
     mutable Replacement replacement;
     mutable boost::intrusive_ptr<Message> empty;
 
-    sys::Mutex callbackLock;
+    sys::Monitor callbackLock;
     MessageCallback* enqueueCallback;
     MessageCallback* dequeueCallback;
+    bool inCallback;
 
     uint32_t requiredCredit;
 };

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp Tue Mar 23 18:00:49 2010
@@ -26,6 +26,7 @@
 #include "qpid/broker/Connection.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/SecuritySettings.h"
 #include <boost/format.hpp>
 
 #if HAVE_SASL
@@ -36,6 +37,7 @@ using qpid::sys::cyrus::CyrusSecurityLay
 
 using namespace qpid::framing;
 using qpid::sys::SecurityLayer;
+using qpid::sys::SecuritySettings;
 using boost::format;
 using boost::str;
 
@@ -152,7 +154,8 @@ void NullAuthenticator::start(const stri
 #if HAVE_SASL
         // encryption required - check to see if we are running over an
         // encrypted SSL connection.
-        sasl_ssf_t external_ssf = (sasl_ssf_t) connection.getSSF();
+        SecuritySettings external = connection.getExternalSecuritySettings();
+        sasl_ssf_t external_ssf = (sasl_ssf_t) external.ssf;
         if (external_ssf < 1)    // < 1 == unencrypted
 #endif
         {
@@ -244,7 +247,9 @@ void CyrusAuthenticator::init()
 
     // If the transport provides encryption, notify the SASL library of
     // the key length and set the ssf range to prevent double encryption.
-    sasl_ssf_t external_ssf = (sasl_ssf_t) connection.getSSF();
+    SecuritySettings external = connection.getExternalSecuritySettings();
+    QPID_LOG(debug, "External ssf=" << external.ssf << " and auth=" << external.authid);
+    sasl_ssf_t external_ssf = (sasl_ssf_t) external.ssf;
     if (external_ssf) {
         int result = sasl_setprop(sasl_conn, SASL_SSF_EXTERNAL, &external_ssf);
         if (result != SASL_OK) {
@@ -258,16 +263,25 @@ void CyrusAuthenticator::init()
              ", max_ssf: " << secprops.max_ssf <<
              ", external_ssf: " << external_ssf );
 
+    if (!external.authid.empty()) {
+        const char* external_authid = external.authid.c_str();
+        int result = sasl_setprop(sasl_conn, SASL_AUTH_EXTERNAL, external_authid);
+        if (result != SASL_OK) {
+            throw framing::InternalErrorException(QPID_MSG("SASL error: unable to set external auth: " << result));
+        }
+
+        QPID_LOG(debug, "external auth detected and set to " << external_authid);
+    }
+
     secprops.maxbufsize = 65535;
     secprops.property_names = 0;
     secprops.property_values = 0;
     secprops.security_flags = 0; /* or SASL_SEC_NOANONYMOUS etc as appropriate */
-    
+    if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY;    
     int result = sasl_setprop(sasl_conn, SASL_SEC_PROPS, &secprops);
     if (result != SASL_OK) {
         throw framing::InternalErrorException(QPID_MSG("SASL error: " << result));
     }
-
 }
 
 CyrusAuthenticator::~CyrusAuthenticator()

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp Tue Mar 23 18:00:49 2010
@@ -23,12 +23,14 @@
 #include "qpid/amqp_0_10/Connection.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/SecureConnection.h"
+#include "qpid/sys/SecuritySettings.h"
 #include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace broker {
 
 using framing::ProtocolVersion;
+using qpid::sys::SecuritySettings;
 typedef std::auto_ptr<amqp_0_10::Connection> CodecPtr;
 typedef std::auto_ptr<SecureConnection> SecureConnectionPtr;
 typedef std::auto_ptr<Connection> ConnectionPtr;
@@ -38,7 +40,7 @@ SecureConnectionFactory::SecureConnectio
 
 sys::ConnectionCodec*
 SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
-                                unsigned int conn_ssf ) {
+                                const SecuritySettings& external) {
     if (broker.getConnectionCounter().allowConnection())
     {
         QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused");
@@ -47,7 +49,7 @@ SecureConnectionFactory::create(Protocol
     if (v == ProtocolVersion(0, 10)) {
         SecureConnectionPtr sc(new SecureConnection());
         CodecPtr c(new amqp_0_10::Connection(out, id, false));
-        ConnectionPtr i(new broker::Connection(c.get(), broker, id, conn_ssf, false));
+        ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, false));
         i->setSecureConnection(sc.get());
         c->setInputHandler(InputPtr(i.release()));
         sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));
@@ -58,11 +60,11 @@ SecureConnectionFactory::create(Protocol
 
 sys::ConnectionCodec*
 SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id,
-                                unsigned int conn_ssf) {
+                                const SecuritySettings& external) {
     // used to create connections from one broker to another
     SecureConnectionPtr sc(new SecureConnection());
     CodecPtr c(new amqp_0_10::Connection(out, id, true));
-    ConnectionPtr i(new broker::Connection(c.get(), broker, id, conn_ssf, true ));
+    ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, true ));
     i->setSecureConnection(sc.get());
     c->setInputHandler(InputPtr(i.release()));
     sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h Tue Mar 23 18:00:49 2010
@@ -34,11 +34,10 @@ class SecureConnectionFactory : public s
 
     sys::ConnectionCodec*
     create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id,
-           unsigned int conn_ssf);
+           const qpid::sys::SecuritySettings&);
 
     sys::ConnectionCodec*
-    create(sys::OutputControl&, const std::string& id,
-           unsigned int conn_ssf);
+    create(sys::OutputControl&, const std::string& id, const qpid::sys::SecuritySettings&);
 
   private:
     Broker& broker;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Tue Mar 23 18:00:49 2010
@@ -213,7 +213,7 @@ void ConnectionHandler::start(const Fiel
 
     if (sasl.get()) {
         string response = sasl->start(mechanism.empty() ? mechlist : mechanism,
-                                      getSSF ? getSSF() : 0);
+                                      getSecuritySettings ? getSecuritySettings() : 0);
         proxy.startOk(properties, sasl->getMechanism(), response, locale);
     } else {
         //TODO: verify that desired mechanism and locale are supported

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.h Tue Mar 23 18:00:49 2010
@@ -40,6 +40,11 @@
 #include <memory>
 
 namespace qpid {
+
+namespace sys {
+struct SecuritySettings;
+}
+
 namespace client {
 
 class ConnectionHandler : private StateManager,
@@ -95,7 +100,7 @@ public:
     using InputHandler::handle;
     typedef boost::function<void()> CloseListener;    
     typedef boost::function<void(uint16_t, const std::string&)> ErrorListener;
-    typedef boost::function<unsigned int()> GetConnSSF;
+    typedef boost::function<const qpid::sys::SecuritySettings*()> GetSecuritySettings;
 
     ConnectionHandler(const ConnectionSettings&, framing::ProtocolVersion&);
 
@@ -123,7 +128,7 @@ public:
 
     static framing::connection::CloseCode convert(uint16_t replyCode);
     const std::string& getUserId() const { return operUserId; }
-    GetConnSSF  getSSF;     /** query the connection for its security strength factor */
+    GetSecuritySettings  getSecuritySettings;     /** query the transport for its security details */
 };
 
 }}

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Tue Mar 23 18:00:49 2010
@@ -134,6 +134,15 @@ IOThread& theIO() {
     return io;
 }
 
+// Bring theIO into existence on library load rather than first use.
+// This avoids it being destroyed whilst something in the main program
+// still exists
+struct InitAtLoad {
+    InitAtLoad() {
+        (void) theIO();
+    }
+} init;
+
 class HeartbeatTask : public TimerTask {
     TimeoutHandler& timeout;
 
@@ -165,7 +174,7 @@ ConnectionImpl::ConnectionImpl(framing::
                                   CLOSE_CODE_NORMAL, std::string());
     //only set error handler once  open
     handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
-    handler.getSSF = boost::bind(&Connector::getSSF, boost::ref(connector));
+    handler.getSecuritySettings = boost::bind(&Connector::getSecuritySettings, boost::ref(connector));
 }
 
 const uint16_t ConnectionImpl::NEXT_CHANNEL = std::numeric_limits<uint16_t>::max();
@@ -195,7 +204,8 @@ void ConnectionImpl::addSession(const bo
             throw SessionBusyException(QPID_MSG("Channel " << ss->getChannel() << " attached to " << ss->getId()));
         } //else channel is busy, but we can keep looking for a free one
     }
-
+    // If we get here, we didn't find any available channel.
+    throw ResourceLimitExceededException("There are no channels available");
 }
 
 void ConnectionImpl::handle(framing::AMQFrame& frame)

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Connector.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Connector.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Connector.h Tue Mar 23 18:00:49 2010
@@ -35,6 +35,7 @@ namespace sys {
 class ShutdownHandler;
 class SecurityLayer;
 class Poller;
+struct SecuritySettings;
 }
 
 namespace framing {
@@ -74,7 +75,7 @@ class Connector : public framing::Output
 
     virtual void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
 
-    virtual unsigned int getSSF() = 0;
+    virtual const qpid::sys::SecuritySettings* getSecuritySettings() = 0;
 };
 
 }}

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/RdmaConnector.cpp Tue Mar 23 18:00:49 2010
@@ -109,7 +109,7 @@ class RdmaConnector : public Connector, 
     framing::OutputHandler* getOutputHandler();
     const std::string& getIdentifier() const;
     void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
-    unsigned int getSSF() { return 0; }
+    const qpid::sys::SecuritySettings* getSecuritySettings() { return 0; }
 
     size_t decode(const char* buffer, size_t size);
     size_t encode(const char* buffer, size_t size);

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Sasl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Sasl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Sasl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Sasl.h Tue Mar 23 18:00:49 2010
@@ -30,6 +30,7 @@ namespace qpid {
 
 namespace sys {
 class SecurityLayer;
+struct SecuritySettings;
 }
 
 namespace client {
@@ -48,17 +49,10 @@ class Sasl
      *
      * @param mechanisms Comma-separated list of the SASL mechanism the
      *             client supports.
-     * @param ssf  Security Strength Factor (SSF). SSF is used to negotiate
-     *             a SASL security layer on top of the connection should both
-     *             parties require and support it. The value indicates the
-     *             required level of security for communication. Possible
-     *             values are:
-     *             @li 0  No security
-     *             @li 1  Integrity checking only
-     *             @li >1 Integrity and confidentiality with the number
-     *                    giving the encryption key length.
+     * @param externalSecuritySettings security related details from the underlying transport
      */
-    virtual std::string start(const std::string& mechanisms, unsigned int ssf) = 0;
+    virtual std::string start(const std::string& mechanisms,
+                              const qpid::sys::SecuritySettings* externalSecuritySettings = 0) = 0;
     virtual std::string step(const std::string& challenge) = 0;
     virtual std::string getMechanism() = 0;
     virtual std::string getUserId() = 0;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SaslFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SaslFactory.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SaslFactory.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SaslFactory.cpp Tue Mar 23 18:00:49 2010
@@ -61,6 +61,7 @@ std::auto_ptr<SaslFactory> SaslFactory::
 #include "qpid/Exception.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/sys/SecurityLayer.h"
+#include "qpid/sys/SecuritySettings.h"
 #include "qpid/sys/cyrus/CyrusSecurityLayer.h"
 #include "qpid/log/Statement.h"
 #include <sasl/sasl.h>
@@ -70,6 +71,7 @@ namespace qpid {
 namespace client {
 
 using qpid::sys::SecurityLayer;
+using qpid::sys::SecuritySettings;
 using qpid::sys::cyrus::CyrusSecurityLayer;
 using qpid::framing::InternalErrorException;
 
@@ -80,7 +82,7 @@ class CyrusSasl : public Sasl
   public:
     CyrusSasl(const ConnectionSettings&);
     ~CyrusSasl();
-    std::string start(const std::string& mechanisms, unsigned int ssf);
+    std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings);
     std::string step(const std::string& challenge);
     std::string getMechanism();
     std::string getUserId();
@@ -176,7 +178,7 @@ namespace {
     const std::string SSL("ssl");
 }
 
-std::string CyrusSasl::start(const std::string& mechanisms, unsigned int ssf)
+std::string CyrusSasl::start(const std::string& mechanisms, const SecuritySettings* externalSettings)
 {
     QPID_LOG(debug, "CyrusSasl::start(" << mechanisms << ")");
     int result = sasl_client_new(settings.service.c_str(),
@@ -190,14 +192,22 @@ std::string CyrusSasl::start(const std::
 
     sasl_security_properties_t secprops;
 
-    if (ssf) {
-        sasl_ssf_t external_ssf = (sasl_ssf_t) ssf;
+    if (externalSettings) {
+        sasl_ssf_t external_ssf = (sasl_ssf_t) externalSettings->ssf;
         if (external_ssf) {
             int result = sasl_setprop(conn, SASL_SSF_EXTERNAL, &external_ssf);
             if (result != SASL_OK) {
                 throw framing::InternalErrorException(QPID_MSG("SASL error: unable to set external SSF: " << result));
             }
-            QPID_LOG(debug, "external SSF detected and set to " << ssf);
+            QPID_LOG(debug, "external SSF detected and set to " << external_ssf);
+        }
+        if (externalSettings->authid.size()) {
+            const char* external_authid = externalSettings->authid.c_str();
+            result = sasl_setprop(conn, SASL_AUTH_EXTERNAL, external_authid);
+            if (result != SASL_OK) {
+                throw framing::InternalErrorException(QPID_MSG("SASL error: unable to set external auth: " << result));
+            }
+            QPID_LOG(debug, "external auth detected and set to " << external_authid);
         }
     }
 
@@ -216,7 +226,6 @@ std::string CyrusSasl::start(const std::
         throw framing::InternalErrorException(QPID_MSG("SASL error: " << sasl_errdetail(conn)));
     }
 
-
     sasl_interact_t* client_interact = 0;
     const char *out = 0;
     unsigned outlen = 0;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SslConnector.cpp Tue Mar 23 18:00:49 2010
@@ -34,6 +34,7 @@
 #include "qpid/sys/ssl/SslSocket.h"
 #include "qpid/sys/Dispatcher.h"
 #include "qpid/sys/Poller.h"
+#include "qpid/sys/SecuritySettings.h"
 #include "qpid/Msg.h"
 
 #include <iostream>
@@ -86,6 +87,7 @@ class SslConnector : public Connector
     const uint16_t maxFrameSize;
     framing::ProtocolVersion version;
     bool initiated;
+    SecuritySettings securitySettings;
 
     sys::Mutex closedLock;
     bool closed;
@@ -125,7 +127,7 @@ class SslConnector : public Connector
     sys::ShutdownHandler* getShutdownHandler() const;
     framing::OutputHandler* getOutputHandler();
     const std::string& getIdentifier() const;
-    unsigned int getSSF() { return socket.getKeyLen(); }
+    const SecuritySettings* getSecuritySettings();
 
 public:
     SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion,
@@ -366,4 +368,11 @@ void SslConnector::eof(SslIO&) {
     handleClosed();
 }
 
+const SecuritySettings* SslConnector::getSecuritySettings()
+{
+    securitySettings.ssf = socket.getKeyLen();
+    securitySettings.authid = "dummy";//set to non-empty string to enable external authentication
+    return &securitySettings; 
+}
+
 }} // namespace qpid::client

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.cpp Tue Mar 23 18:00:49 2010
@@ -199,8 +199,19 @@ void TCPConnector::send(AMQFrame& frame)
     } else {
         notifyWrite = (currentSize >= maxFrameSize);
     }
-    }
+    /*
+      NOTE: Moving the following line into this mutex block
+            is a workaround for BZ 570168, in which the test
+            testConcurrentSenders causes a hang about 1.5%
+            of the time.  ( To see the hang much more frequently
+            leave this line out of the mutex block, and put a 
+            small usleep just before it.)
+
+            TODO mgoulish - fix the underlying cause and then
+                            move this call back outside the mutex.
+    */
     if (notifyWrite && !closed) aio->notifyPendingWrite();
+    }
 }
 
 void TCPConnector::handleClosed() {

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.h Tue Mar 23 18:00:49 2010
@@ -92,7 +92,7 @@ class TCPConnector : public Connector, p
     framing::OutputHandler* getOutputHandler();
     const std::string& getIdentifier() const;
     void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
-    unsigned int getSSF() { return 0; }
+    const qpid::sys::SecuritySettings* getSecuritySettings() { return 0; }
 
     size_t decode(const char* buffer, size_t size);
     size_t encode(const char* buffer, size_t size);

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp Tue Mar 23 18:00:49 2010
@@ -25,8 +25,10 @@
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/framing/List.h"
+#include "qpid/log/Statement.h"
 #include <algorithm>
 #include <functional>
+#include <limits>
 
 using namespace qpid::framing;
 using namespace qpid::messaging;
@@ -39,6 +41,7 @@ namespace {
 const std::string iso885915("iso-8859-15");
 const std::string utf8("utf8");
 const std::string utf16("utf16");
+const std::string binary("binary");
 const std::string amqp0_10_binary("amqp0-10:binary");
 const std::string amqp0_10_bit("amqp0-10:bit");
 const std::string amqp0_10_datetime("amqp0-10:datetime");
@@ -129,7 +132,7 @@ Variant toVariant(boost::shared_ptr<Fiel
       case 0x02: out = in->getIntegerValue<int8_t, 1>(); break;
       case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break;
       case 0x04: break; //TODO: iso-8859-15 char
-      case 0x08: out = in->getIntegerValue<bool, 1>(); break;
+      case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break;
       case 0x10: out.setEncoding(amqp0_10_binary);
       case 0x11: out = in->getIntegerValue<int16_t, 2>(); break;
       case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break;
@@ -163,8 +166,8 @@ Variant toVariant(boost::shared_ptr<Fiel
       case 0x96: 
       case 0xa0:
       case 0xab:
-        setEncodingFor(out, in->getType());
         out = in->get<std::string>();
+        setEncodingFor(out, in->getType());
         break;
 
       case 0xa8:
@@ -188,6 +191,28 @@ Variant toVariant(boost::shared_ptr<Fiel
     return out;
 }
 
+boost::shared_ptr<FieldValue> convertString(const std::string& value, const std::string& encoding)
+{
+    bool large = value.size() > std::numeric_limits<uint16_t>::max();
+    if (encoding.empty() || encoding == amqp0_10_binary || encoding == binary) {
+        if (large) {
+            return boost::shared_ptr<FieldValue>(new Var32Value(value, 0xa0));
+        } else {
+            return boost::shared_ptr<FieldValue>(new Var16Value(value, 0x90));
+        }
+    } else if (encoding == utf8 && !large) {
+            return boost::shared_ptr<FieldValue>(new Str16Value(value));
+    } else if (encoding == utf16 && !large) {
+        return boost::shared_ptr<FieldValue>(new Var16Value(value, 0x96));
+    } else if (encoding == iso885915 && !large) {
+        return boost::shared_ptr<FieldValue>(new Var16Value(value, 0x94));
+    } else {
+        //either the string is too large for the encoding in amqp 0-10, or the encoding was not recognised
+        QPID_LOG(warning, "Could not encode " << value.size() << " byte value as " << encoding << ", encoding as vbin32.");
+        return boost::shared_ptr<FieldValue>(new Var32Value(value, 0xa0));
+    }
+}
+
 boost::shared_ptr<FieldValue> toFieldValue(const Variant& in)
 {
     boost::shared_ptr<FieldValue> out;
@@ -204,8 +229,7 @@ boost::shared_ptr<FieldValue> toFieldVal
       case VAR_INT64: out = boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); break;
       case VAR_FLOAT: out = boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); break;
       case VAR_DOUBLE: out = boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); break;
-        //TODO: check encoding (and length?) when deciding what AMQP type to treat string as
-      case VAR_STRING: out = boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); break;
+      case VAR_STRING: out = convertString(in.asString(), in.getEncoding()); break;
       case VAR_UUID: out = boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data())); break;
       case VAR_MAP: 
         out = boost::shared_ptr<FieldValue>(toFieldTableValue(in.asMap()));

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Tue Mar 23 18:00:49 2010
@@ -1,4 +1,4 @@
-/*
+ /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,8 +20,9 @@
  */
 #include "ConnectionImpl.h"
 #include "SessionImpl.h"
+#include "SimpleUrlParser.h"
 #include "qpid/messaging/Session.h"
-#include "qpid/client/PrivateImplRef.h"
+#include "qpid/messaging/PrivateImplRef.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
 #include <boost/intrusive_ptr.hpp>
@@ -33,13 +34,42 @@ namespace amqp0_10 {
 
 using qpid::messaging::Variant;
 using qpid::framing::Uuid;
-using namespace qpid::sys;
 
-template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value)
+void convert(const Variant::List& from, std::vector<std::string>& to)
+{
+    for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
+        to.push_back(i->asString());                
+    }
+}
+
+template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value)
 {
     Variant::Map::const_iterator i = map.find(key);
     if (i != map.end()) {
         value = (T) i->second;
+        QPID_LOG(debug, "option " << key << " specified as " << i->second);
+        return true;
+    } else {
+        QPID_LOG(debug, "option " << key << " not specified");
+        return false;
+    }
+}
+
+template <> 
+bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
+                                                   const std::string& key,
+                                                   std::vector<std::string>& value)
+{
+    Variant::Map::const_iterator i = map.find(key);
+    if (i != map.end()) {
+        if (i->second.getType() == qpid::messaging::VAR_LIST) {
+            convert(i->second.asList(), value);
+        } else {
+            value.push_back(i->second.asString());
+        }
+        return true;
+    } else {
+        return false;
     }
 }
 
@@ -59,24 +89,47 @@ void convert(const Variant::Map& from, C
     setIfFound(from, "max-channels", to.maxChannels);
     setIfFound(from, "max-frame-size", to.maxFrameSize);
     setIfFound(from, "bounds", to.bounds);
+
+    setIfFound(from, "protocol", to.protocol);
 }
 
 ConnectionImpl::ConnectionImpl(const Variant::Map& options) : 
-    reconnectionEnabled(true), timeout(-1),
-    minRetryInterval(1), maxRetryInterval(30)
+    reconnect(true), timeout(-1), limit(-1),
+    minReconnectInterval(3), maxReconnectInterval(60),
+    retries(0)
+{
+    QPID_LOG(debug, "Created connection with " << options);
+    setOptions(options);
+}
+
+void ConnectionImpl::setOptions(const Variant::Map& options)
 {
-    QPID_LOG(debug, "Opening connection to " << url << " with " << options);
     convert(options, settings);
-    setIfFound(options, "reconnection-enabled", reconnectionEnabled);
-    setIfFound(options, "reconnection-timeout", timeout);
-    setIfFound(options, "min-retry-interval", minRetryInterval);
-    setIfFound(options, "max-retry-interval", maxRetryInterval);
+    setIfFound(options, "reconnect", reconnect);
+    setIfFound(options, "reconnect-timeout", timeout);
+    setIfFound(options, "reconnect-limit", limit);
+    int64_t reconnectInterval;
+    if (setIfFound(options, "reconnect-interval", reconnectInterval)) {
+        minReconnectInterval = maxReconnectInterval = reconnectInterval;
+    } else {
+        setIfFound(options, "min-reconnect-interval", minReconnectInterval);
+        setIfFound(options, "max-reconnect-interval", maxReconnectInterval);
+    }
+    setIfFound(options, "urls", urls);    
+}
+
+void ConnectionImpl::setOption(const std::string& name, const Variant& value)
+{
+    Variant::Map options;
+    options[name] = value;
+    setOptions(options);
+    QPID_LOG(debug, "Set " << name << " to " << value);
 }
 
 void ConnectionImpl::open(const std::string& u)
 {
-    url = u;
-    connection.open(url, settings);
+    urls.push_back(u);
+    connect();
 }
 
 void ConnectionImpl::close()
@@ -97,7 +150,7 @@ void ConnectionImpl::close()
 boost::intrusive_ptr<SessionImpl> getImplPtr(qpid::messaging::Session& session)
 {
     return boost::dynamic_pointer_cast<SessionImpl>(
-        qpid::client::PrivateImplRef<qpid::messaging::Session>::get(session)
+        qpid::messaging::PrivateImplRef<qpid::messaging::Session>::get(session)
     );
 }
 
@@ -134,64 +187,65 @@ qpid::messaging::Session ConnectionImpl:
     try {
         getImplPtr(impl)->setSession(connection.newSession(name));
     } catch (const TransportFailure&) {
-        reconnect();
+        connect();
     }
     return impl;
 }
 
-void ConnectionImpl::reconnect()
+void ConnectionImpl::connect()
 {
-    AbsTime start = now();
-    ScopedLock<Semaphore> l(semaphore);
+    qpid::sys::AbsTime start = qpid::sys::now();
+    qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore);
     if (!connection.isOpen()) connect(start);
 }
 
-bool expired(const AbsTime& start, int timeout)
+bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
 {
     if (timeout == 0) return true;
     if (timeout < 0) return false;
-    Duration used(start, now());
-    Duration allowed = timeout * TIME_SEC;
-    return allowed > used;
+    qpid::sys::Duration used(start, qpid::sys::now());
+    qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC;
+    return allowed < used;
 }
 
-void ConnectionImpl::connect(const AbsTime& started)
+void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
 {
-    for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) {
-        if (expired(started, timeout)) throw TransportFailure();
+    for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) {
+        if (!reconnect) throw TransportFailure("Failed to connect (reconnect disabled)");
+        if (limit >= 0 && retries++ >= limit) throw TransportFailure("Failed to connect within reconnect limit");
+        if (expired(started, timeout)) throw TransportFailure("Failed to connect within reconnect timeout");
         else qpid::sys::sleep(i);
     }
+    retries = 0;
 }
 
 bool ConnectionImpl::tryConnect()
 {
-    if (tryConnect(url) ||
-        (failoverListener.get() && tryConnect(failoverListener->getKnownBrokers())))
-    {
-        return resetSessions();
-    } else {
-        return false;
-    }
+    if (tryConnect(urls)) return resetSessions();
+    else return false;
 }
 
-bool ConnectionImpl::tryConnect(const Url& u)
+bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls)
 {
-    try {
-        QPID_LOG(info, "Trying to connect to " << url << "...");                
-        connection.open(u, settings);
-        failoverListener.reset(new FailoverListener(connection));
-        return true;
-    } catch (const Exception& e) {
-        //TODO: need to fix timeout on open so that it throws TransportFailure
-        QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());                
-    }
-    return false;
-}
-
-bool ConnectionImpl::tryConnect(const std::vector<Url>& urls)
-{
-    for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
-        if (tryConnect(*i)) return true;
+    for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
+        try {
+            QPID_LOG(info, "Trying to connect to " << *i << "...");
+            //TODO: when url support is more complete can avoid this test here
+            if (i->find("amqp:") == 0) {
+                Url url(*i);
+                connection.open(url, settings);
+            } else {
+                SimpleUrlParser::parse(*i, settings);
+                connection.open(settings);
+            }
+            QPID_LOG(info, "Connected to " << *i);                
+            return true;
+        } catch (const Exception& e) {
+            //TODO: need to fix timeout on
+            //qpid::client::Connection::open() so that it throws
+            //TransportFailure rather than a ConnectionException
+            QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());                
+        }
     }
     return false;
 }

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Tue Mar 23 18:00:49 2010
@@ -25,7 +25,6 @@
 #include "qpid/messaging/Variant.h"
 #include "qpid/Url.h"
 #include "qpid/client/Connection.h"
-#include "qpid/client/FailoverListener.h"
 #include "qpid/client/ConnectionSettings.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/Semaphore.h"
@@ -46,7 +45,8 @@ class ConnectionImpl : public qpid::mess
     qpid::messaging::Session newSession(bool transactional, const std::string& name);
     qpid::messaging::Session getSession(const std::string& name) const;
     void closed(SessionImpl&);
-    void reconnect();
+    void connect();
+    void setOption(const std::string& name, const qpid::messaging::Variant& value);
   private:
     typedef std::map<std::string, qpid::messaging::Session> Sessions;
 
@@ -54,18 +54,19 @@ class ConnectionImpl : public qpid::mess
     qpid::sys::Semaphore semaphore;//used to coordinate reconnection
     Sessions sessions;
     qpid::client::Connection connection;
-    std::auto_ptr<FailoverListener> failoverListener;
-    qpid::Url url;
+    std::vector<std::string> urls;
     qpid::client::ConnectionSettings settings;
-    bool reconnectionEnabled;
-    int timeout;
-    int minRetryInterval;
-    int maxRetryInterval;
+    bool reconnect;
+    int64_t timeout;
+    int32_t limit;
+    int64_t minReconnectInterval;
+    int64_t maxReconnectInterval;
+    int32_t retries;
 
+    void setOptions(const qpid::messaging::Variant::Map& options);
     void connect(const qpid::sys::AbsTime& started);
     bool tryConnect();
-    bool tryConnect(const std::vector<Url>& urls);
-    bool tryConnect(const Url&);
+    bool tryConnect(const std::vector<std::string>& urls);
     bool resetSessions();
 };
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Tue Mar 23 18:00:49 2010
@@ -57,14 +57,14 @@ qpid::messaging::Message ReceiverImpl::f
 bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
 {
     Get f(*this, message, timeout);
-    while (!parent.execute(f)) {}
+    while (!parent->execute(f)) {}
     return f.result;
 }
 
 bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
 {
     Fetch f(*this, message, timeout);
-    while (!parent.execute(f)) {}
+    while (!parent->execute(f)) {}
     return f.result;
 }
 
@@ -112,7 +112,7 @@ void ReceiverImpl::init(qpid::client::As
     }
     if (state == CANCELLED) {
         source->cancel(session, destination);
-        parent.receiverCancelled(destination);        
+        parent->receiverCancelled(destination);        
     } else {
         source->subscribe(session, destination);
         start();
@@ -129,23 +129,23 @@ uint32_t ReceiverImpl::getCapacity()
 
 uint32_t ReceiverImpl::available()
 {
-    return parent.available(destination);
+    return parent->available(destination);
 }
 
 uint32_t ReceiverImpl::pendingAck()
 {
-    return parent.pendingAck(destination);
+    return parent->pendingAck(destination);
 }
 
 ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, 
                            const qpid::messaging::Address& a) : 
 
-    parent(p), destination(name), address(a), byteCredit(0xFFFFFFFF), 
+    parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), 
     state(UNRESOLVED), capacity(0), window(0) {}
 
 bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
 {
-    return parent.get(*this, message, timeout);
+    return parent->get(*this, message, timeout);
 }
 
 bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
@@ -172,7 +172,7 @@ void ReceiverImpl::closeImpl() 
     if (state != CANCELLED) {
         state = CANCELLED;
         source->cancel(session, destination);
-        parent.receiverCancelled(destination);
+        parent->receiverCancelled(destination);
     }
 }
 
@@ -188,7 +188,7 @@ void ReceiverImpl::setCapacityImpl(uint3
 }
 qpid::messaging::Session ReceiverImpl::getSession() const
 {
-    return qpid::messaging::Session(&parent);
+    return qpid::messaging::Session(parent.get());
 }
 
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Tue Mar 23 18:00:49 2010
@@ -28,6 +28,7 @@
 #include "qpid/client/AsyncSession.h"
 #include "qpid/client/amqp0_10/SessionImpl.h"
 #include "qpid/messaging/Duration.h"
+#include <boost/intrusive_ptr.hpp>
 #include <memory>
 
 namespace qpid {
@@ -65,7 +66,7 @@ class ReceiverImpl : public qpid::messag
     void received(qpid::messaging::Message& message);
     qpid::messaging::Session getSession() const;
   private:
-    SessionImpl& parent;
+    boost::intrusive_ptr<SessionImpl> parent;
     const std::string destination;
     const qpid::messaging::Address address;
     const uint32_t byteCredit;
@@ -133,13 +134,13 @@ class ReceiverImpl : public qpid::messag
     template <class F> void execute()
     {
         F f(*this);
-        parent.execute(f);
+        parent->execute(f);
     }
     
     template <class F, class P> void execute1(P p)
     {
         F f(*this, p);
-        parent.execute(f);
+        parent->execute(f);
     }
 };
 

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Tue Mar 23 18:00:49 2010
@@ -31,17 +31,17 @@ namespace amqp0_10 {
 
 SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, 
                        const qpid::messaging::Address& _address) : 
-    parent(_parent), name(_name), address(_address), state(UNRESOLVED),
+    parent(&_parent), name(_name), address(_address), state(UNRESOLVED),
     capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {}
 
 void SenderImpl::send(const qpid::messaging::Message& message) 
 {
     if (unreliable) {
         UnreliableSend f(*this, &message);
-        parent.execute(f);
+        parent->execute(f);
     } else {
         Send f(*this, &message);
-        while (f.repeat) parent.execute(f);
+        while (f.repeat) parent->execute(f);
     }
 }
 
@@ -60,7 +60,7 @@ uint32_t SenderImpl::getCapacity() { ret
 uint32_t SenderImpl::pending()
 {
     CheckPendingSends f(*this, false);
-    parent.execute(f);
+    parent->execute(f);
     return f.pending;
 } 
 
@@ -73,7 +73,7 @@ void SenderImpl::init(qpid::client::Asyn
     }
     if (state == CANCELLED) {
         sink->cancel(session, name);
-        parent.senderCancelled(name);
+        parent->senderCancelled(name);
     } else {
         sink->declare(session, name);
         replay();
@@ -140,7 +140,7 @@ void SenderImpl::closeImpl()
 {
     state = CANCELLED;
     sink->cancel(session, name);
-    parent.senderCancelled(name);
+    parent->senderCancelled(name);
 }
 
 const std::string& SenderImpl::getName() const
@@ -150,7 +150,7 @@ const std::string& SenderImpl::getName()
 
 qpid::messaging::Session SenderImpl::getSession() const
 {
-    return qpid::messaging::Session(&parent);
+    return qpid::messaging::Session(parent.get());
 }
 
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Tue Mar 23 18:00:49 2010
@@ -28,6 +28,7 @@
 #include "qpid/client/AsyncSession.h"
 #include "qpid/client/amqp0_10/SessionImpl.h"
 #include <memory>
+#include <boost/intrusive_ptr.hpp>
 #include <boost/ptr_container/ptr_deque.hpp>
 
 namespace qpid {
@@ -58,7 +59,7 @@ class SenderImpl : public qpid::messagin
     qpid::messaging::Session getSession() const;
 
   private:
-    SessionImpl& parent;
+    boost::intrusive_ptr<SessionImpl> parent;
     const std::string name;
     const qpid::messaging::Address address;
     State state;
@@ -143,13 +144,13 @@ class SenderImpl : public qpid::messagin
     template <class F> void execute()
     {
         F f(*this);
-        parent.execute(f);
+        parent->execute(f);
     }
     
     template <class F, class P> bool execute1(P p)
     {
         F f(*this, p);
-        return parent.execute(f);
+        return parent->execute(f);
     }    
 };
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Tue Mar 23 18:00:49 2010
@@ -24,7 +24,7 @@
 #include "qpid/client/amqp0_10/SenderImpl.h"
 #include "qpid/client/amqp0_10/MessageSource.h"
 #include "qpid/client/amqp0_10/MessageSink.h"
-#include "qpid/client/PrivateImplRef.h"
+#include "qpid/messaging/PrivateImplRef.h"
 #include "qpid/Exception.h"
 #include "qpid/log/Statement.h"
 #include "qpid/messaging/Address.h"
@@ -49,7 +49,7 @@ namespace qpid {
 namespace client {
 namespace amqp0_10 {
 
-SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(c), transactional(t) {}
+SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {}
 
 
 void SessionImpl::sync()
@@ -108,13 +108,13 @@ void SessionImpl::close()
     for (std::vector<std::string>::const_iterator i = r.begin(); i != r.end(); ++i) getReceiver(*i).close();
     
 
-    connection.closed(*this);
+    connection->closed(*this);
     session.close();
 }
 
 template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t)
 {
-    return boost::dynamic_pointer_cast<S>(qpid::client::PrivateImplRef<T>::get(t));
+    return boost::dynamic_pointer_cast<S>(qpid::messaging::PrivateImplRef<T>::get(t));
 }
 
 template <class T> void getFreeKey(std::string& key, T& map)
@@ -431,12 +431,12 @@ void SessionImpl::senderCancelled(const 
 
 void SessionImpl::reconnect()
 {
-    connection.reconnect();    
+    connection->connect();    
 }
 
 qpid::messaging::Connection SessionImpl::getConnection() const
 {
-    return qpid::messaging::Connection(&connection);
+    return qpid::messaging::Connection(connection.get());
 }
 
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Tue Mar 23 18:00:49 2010
@@ -29,6 +29,7 @@
 #include "qpid/client/amqp0_10/AddressResolution.h"
 #include "qpid/client/amqp0_10/IncomingMessages.h"
 #include "qpid/sys/Mutex.h"
+#include <boost/intrusive_ptr.hpp>
 
 namespace qpid {
 
@@ -106,7 +107,7 @@ class SessionImpl : public qpid::messagi
     typedef std::map<std::string, qpid::messaging::Sender> Senders;
 
     mutable qpid::sys::Mutex lock;
-    ConnectionImpl& connection;
+    boost::intrusive_ptr<ConnectionImpl> connection;
     qpid::client::Session session;
     AddressResolution resolver;
     IncomingMessages incoming;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp Tue Mar 23 18:00:49 2010
@@ -25,6 +25,7 @@
 #include "qpid/Exception.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/sys/SecurityLayer.h"
+#include "qpid/sys/SecuritySettings.h"
 #include "qpid/log/Statement.h"
 
 #include "boost/tokenizer.hpp"
@@ -33,6 +34,7 @@ namespace qpid {
 namespace client {
 
 using qpid::sys::SecurityLayer;
+using qpid::sys::SecuritySettings;
 using qpid::framing::InternalErrorException;
 
 class WindowsSasl : public Sasl
@@ -40,7 +42,7 @@ class WindowsSasl : public Sasl
   public:
     WindowsSasl(const ConnectionSettings&);
     ~WindowsSasl();
-    std::string start(const std::string& mechanisms, unsigned int ssf);
+    std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings);
     std::string step(const std::string& challenge);
     std::string getMechanism();
     std::string getUserId();
@@ -91,7 +93,7 @@ WindowsSasl::~WindowsSasl() 
 }
 
 std::string WindowsSasl::start(const std::string& mechanisms,
-                               unsigned int /*ssf*/)
+                               const SecuritySettings* /*externalSettings*/)
 {
     QPID_LOG(debug, "WindowsSasl::start(" << mechanisms << ")");
 

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Mar 23 18:00:49 2010
@@ -16,7 +16,8 @@
  *
  */
 
-/** CLUSTER IMPLEMENTATION OVERVIEW
+/**
+ * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1>
  *
  * The cluster works on the principle that if all members of the
  * cluster receive identical input, they will all produce identical
@@ -41,12 +42,15 @@
  *  
  * The following are the current areas where broker uses timers or timestamps:
  * 
- * - Producer flow control: broker::SemanticState uses connection::getClusterOrderOutput.
- *   a FrameHandler that sends frames to the client via the cluster. Used by broker::SessionState
+ * - Producer flow control: broker::SemanticState uses
+ *   connection::getClusterOrderOutput.  a FrameHandler that sends
+ *   frames to the client via the cluster. Used by broker::SessionState
  *   
- * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is implemented by cluster::ExpiryPolicy.
+ * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
+ *   implemented by cluster::ExpiryPolicy.
  * 
- * - Connection heartbeat: sends connection controls, not part of session command counting so OK to ignore.
+ * - Connection heartbeat: sends connection controls, not part of
+ *   session command counting so OK to ignore.
  * 
  * - LinkRegistry: only cluster elder is ever active for links.
  * 
@@ -57,7 +61,10 @@
  *
  * cluster::ExpiryPolicy implements the strategy for message expiry.
  *
- * CLUSTER PROTOCOL OVERVIEW
+ * ClusterTimer implements periodic timed events in the cluster context.
+ * Used for periodic management events.
+ *
+ * <h1>CLUSTER PROTOCOL OVERVIEW</h1>
  * 
  * Messages sent to/from CPG are called Events.
  *
@@ -84,12 +91,16 @@
  *  - Connection control events carrying non-cluster frames: frames sent to the client.
  *    e.g. flow-control frames generated on a timer.
  *
- * CLUSTER INITIALIZATION OVERVIEW
+ * <h1>CLUSTER INITIALIZATION OVERVIEW</h1>
+ *
+ * @see InitialStatusMap
  *
  * When a new member joins the CPG group, all members (including the
  * new one) multicast their "initial status." The new member is in
- * INIT mode until it gets a complete set of initial status messages
- * from all cluster members.
+ * PRE_INIT mode until it gets a complete set of initial status
+ * messages from all cluster members. In a newly-forming cluster is
+ * then in INIT mode until the configured cluster-size members have
+ * joined.
  *
  * The newcomer uses initial status to determine
  *  - The cluster UUID
@@ -97,11 +108,16 @@
  *  - Do I need to get an update from an existing active member?
  *  - Can I recover from my own store?
  *
- * Initialization happens in the Cluster constructor (plugin
- * early-init phase) because it needs to be done before the store
- * initializes. In INIT mode sending & receiving from the cluster are
- * done single-threaded, bypassing the normal PollableQueues because
- * the Poller is not active at this point to service them.
+ * Pre-initialization happens in the Cluster constructor (plugin
+ * early-init phase) because it needs to set the recovery flag before
+ * the store initializes. This phase lasts until inital-status is
+ * received for all active members. The PollableQueues and Multicaster
+ * are in "bypass" mode during this phase since the poller has not
+ * started so there are no threads to serve pollable queues.
+ *
+ * The remaining initialization happens in Cluster::initialize() or,
+ * if cluster-size=N is specified, in the deliver thread when an
+ * initial-status control is delivered that brings the total to N.
  */
 #include "qpid/Exception.h"
 #include "qpid/cluster/Cluster.h"
@@ -191,16 +207,19 @@ struct ClusterDispatcher : public framin
 
     void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
                        uint8_t storeState, const Uuid& shutdownId,
-                       const framing::SequenceNumber& configSeq,
                        const std::string& firstConfig)
     {
         cluster.initialStatus(
             member, version, active, clusterId,
-            framing::cluster::StoreState(storeState), shutdownId, configSeq,
+            framing::cluster::StoreState(storeState), shutdownId, 
             firstConfig, l);
     }
-    void ready(const std::string& url) { cluster.ready(member, url, l); }
-    void configChange(const std::string& current) { cluster.configChange(member, current, l); }
+    void ready(const std::string& url) {
+        cluster.ready(member, url, l);
+    }
+    void configChange(const std::string& current) {
+        cluster.configChange(member, current, l);
+    }
     void updateOffer(uint64_t updatee) {
         cluster.updateOffer(member, updatee, l);
     }
@@ -241,7 +260,7 @@ Cluster::Cluster(const ClusterSettings& 
     quorum(boost::bind(&Cluster::leave, this)),
     decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
     discarding(true),
-    state(INIT),
+    state(PRE_INIT),
     initMap(self, settings.size),
     store(broker.getDataDir().getPath()),
     elder(false),
@@ -271,17 +290,18 @@ Cluster::Cluster(const ClusterSettings& 
     // without modifying delivery-properties.exchange.
     broker.getExchanges().registerExchange(
         boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
     // Load my store status before we go into initialization
     if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
         store.load();
-        if (store.getState() == STORE_STATE_DIRTY_STORE)
-            broker.setRecovery(false); // Ditch my current store.
         if (store.getClusterId())
             clusterId = store.getClusterId(); // Use stored ID if there is one.
         QPID_LOG(notice, "Cluster store state: " << store)
     }
-
     cpg.join(name);
+    // pump the CPG dispatch manually till we get past PRE_INIT.
+    while (state == PRE_INIT)
+        cpg.dispatchOne();
 }
 
 Cluster::~Cluster() {
@@ -298,9 +318,14 @@ void Cluster::initialize() {
     dispatcher.start();
     deliverEventQueue.start();
     deliverFrameQueue.start();
+    mcast.start();
+
+    // Run initMapCompleted immediately to process the initial configuration.
+    assert(state == INIT);
+    initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context.
 
     // Add finalizer last for exception safety.
-    broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); 
+    broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
 }
 
 // Called in connection thread to insert a client connection.
@@ -510,8 +535,13 @@ ConnectionPtr Cluster::getConnection(con
             assert(cp);
         }
         else {              // New remote connection, create a shadow.
-            unsigned int ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0;
-            cp = new Connection(*this, shadowOut, announce->getManagementId(), id, ssf);
+            qpid::sys::SecuritySettings secSettings;
+            if (announce) {
+                secSettings.ssf = announce->getSsf();
+                secSettings.authid = announce->getAuthid();
+                secSettings.nodict = announce->getNodict();
+            }
+            cp = new Connection(*this, shadowOut, announce->getManagementId(), id, secSettings);
         }
         connections.insert(ConnectionMap::value_type(id, cp));
     }
@@ -571,9 +601,27 @@ void Cluster::setReady(Lock&) {
 void Cluster::initMapCompleted(Lock& l) {
     // Called on completion of the initial status map. 
     QPID_LOG(debug, *this << " initial status map complete. ");
-    if (state == INIT) {
-        // We have status for all members so we can make join descisions.
+    if (state == PRE_INIT) {
+        // PRE_INIT means we're still in the earlyInitialize phase, in the constructor.
+        // We decide here whether we want to recover from our store.
+        // We won't recover if we are joining an active cluster or our store is dirty.
+        if (store.hasStore() &&
+            (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE))
+            broker.setRecovery(false); // Ditch my current store.
+        state = INIT;
+    }
+    else if (state == INIT) {
+        // INIT means we are past Cluster::initialize().
+
+        // If we're forming an initial cluster (no active members)
+        // then we wait to reach the configured cluster-size
+        if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) {
+            QPID_LOG(info, *this << initMap.getActualSize()
+                     << " members, waiting for at least " << initMap.getRequiredSize());
+            return;
+        }
         initMap.checkConsistent();
+
         elders = initMap.getElders();
         QPID_LOG(debug, *this << " elders: " << elders);
         if (elders.empty())
@@ -626,7 +674,7 @@ void Cluster::configChange(const MemberI
         mcast.mcastControl(
             ClusterInitialStatusBody(
                 ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, 
-                store.getState(), store.getShutdownId(), store.getConfigSeq(),
+                store.getState(), store.getShutdownId(), 
                 initMap.getFirstConfigStr()
             ),
             self);
@@ -668,15 +716,13 @@ struct AppendQueue {
 
 // Log a snapshot of broker state, used for debugging inconsistency problems.
 // May only be called in deliver thread.
-void Cluster::debugSnapshot(const char* prefix, Connection* connection) {
+std::string Cluster::debugSnapshot() {
     assertClusterSafe();
     std::ostringstream msg;
-    msg << prefix;
-    if (connection) msg << " " << connection->getId();
-    msg << " snapshot " << map.getFrameSeq() << ":";
+    msg << "queue snapshot at " << map.getFrameSeq() << ":";
     AppendQueue append(msg);
     broker.getQueues().eachQueue(append);
-    QPID_LOG(trace, msg.str());
+    return msg.str();
 }
 
 // Called from Broker::~Broker when broker is shut down.  At this
@@ -702,7 +748,6 @@ void Cluster::initialStatus(const Member
                             const framing::Uuid& id, 
                             framing::cluster::StoreState store,
                             const framing::Uuid& shutdownId,
-                            const framing::SequenceNumber& configSeq,
                             const std::string& firstConfig,
                             Lock& l)
 {
@@ -715,7 +760,7 @@ void Cluster::initialStatus(const Member
     initMap.received(
         member,
         ClusterInitialStatusBody(ProtocolVersion(), version, active, id,
-                                 store, shutdownId, configSeq, firstConfig)
+                                 store, shutdownId, firstConfig)
     );
     if (initMap.transitionToComplete()) initMapCompleted(l);
 }
@@ -762,8 +807,9 @@ void Cluster::updateOffer(const MemberId
         deliverEventQueue.start(); // Not involved in update.
     }
     if (updatee != self && url) {
-        debugSnapshot("join");
+        QPID_LOG(debug, debugSnapshot());
         if (mAgent) mAgent->clusterUpdate();
+        // Updatee will call clusterUpdate when update completes
     }
 }
 
@@ -836,7 +882,7 @@ void Cluster::checkUpdateIn(Lock& l) {
         if (mAgent) mAgent->suppress(false); // Enable management output.
         discarding = false;     // ok to set, we're stalled for update.
         QPID_LOG(notice, *this << " update complete, starting catch-up.");
-        debugSnapshot("initial");
+        QPID_LOG(debug, debugSnapshot());
         if (mAgent) mAgent->clusterUpdate();
         deliverEventQueue.start();
     }
@@ -963,7 +1009,8 @@ void Cluster::memberUpdate(Lock& l) {
 
 std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
     static const char* STATE[] = {
-        "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
+        "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP",
+        "READY", "OFFER", "UPDATER", "LEFT"
     };
     assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
     o << "cluster(" << cluster.self << " " << STATE[cluster.state];
@@ -1003,12 +1050,14 @@ void Cluster::errorCheck(const MemberId&
 }
 
 void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
-    timer->deliverWakeup(name);
+    if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+        timer->deliverWakeup(name);
 }
 
 void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
     QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
-    timer->deliverDrop(name);
+    if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+        timer->deliverDrop(name);
 }
 
 bool Cluster::isElder() const {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org