You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/23 19:00:56 UTC
svn commit: r926686 [2/6] - in /qpid/branches/qmf-devel0.7a/qpid: ./ cpp/
cpp/docs/api/ cpp/docs/src/ cpp/examples/ cpp/examples/messaging/
cpp/examples/pub-sub/ cpp/include/qmf/engine/ cpp/include/qpid/agent/
cpp/include/qpid/client/amqp0_10/ cpp/incl...
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/CMakeLists.txt?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/CMakeLists.txt Tue Mar 23 18:00:49 2010
@@ -589,19 +589,6 @@ set_target_properties (qpidcommon PROPER
install (TARGETS qpidcommon
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_COMMON})
-# When the client programming component is installed, Windows also needs
-# the debug variant of qpidcommon and qpidclient, and the PDBs for those
-# as well. It would be nice to figure out a way to put some sanity checking
-# here... as it is, success relies on the packager building the debug then
-# the release, then packaging the release build.
-if (WIN32)
- install (PROGRAMS
- ${CMAKE_CURRENT_BINARY_DIR}/Debug/qpidcommond.dll
- ${CMAKE_CURRENT_BINARY_DIR}/Debug/qpidcommond.lib
- ${CMAKE_CURRENT_BINARY_DIR}/Debug/qpidcommond.pdb
- DESTINATION ${QPID_INSTALL_LIBDIR}
- COMPONENT ${QPID_COMPONENT_CLIENT})
-endif (WIN32)
set (qpidclient_SOURCES
${rgen_client_srcs}
@@ -667,6 +654,7 @@ set (qpidclient_SOURCES
qpid/client/amqp0_10/CodecsInternal.h
qpid/client/amqp0_10/ConnectionImpl.h
qpid/client/amqp0_10/ConnectionImpl.cpp
+ qpid/client/amqp0_10/FailoverUpdates.cpp
qpid/client/amqp0_10/IncomingMessages.h
qpid/client/amqp0_10/IncomingMessages.cpp
qpid/client/amqp0_10/MessageSink.h
@@ -679,6 +667,8 @@ set (qpidclient_SOURCES
qpid/client/amqp0_10/SessionImpl.cpp
qpid/client/amqp0_10/SenderImpl.h
qpid/client/amqp0_10/SenderImpl.cpp
+ qpid/client/amqp0_10/SimpleUrlParser.h
+ qpid/client/amqp0_10/SimpleUrlParser.cpp
)
add_library (qpidclient SHARED ${qpidclient_SOURCES})
@@ -699,14 +689,6 @@ if (NOT QPID_GENERATED_HEADERS_IN_SOURCE
DESTINATION ${QPID_INSTALL_INCLUDEDIR}
COMPONENT ${QPID_COMPONENT_CLIENT_INCLUDE})
endif (NOT QPID_GENERATED_HEADERS_IN_SOURCE)
-if (WIN32)
- install (PROGRAMS
- ${CMAKE_CURRENT_BINARY_DIR}/Debug/qpidclientd.dll
- ${CMAKE_CURRENT_BINARY_DIR}/Debug/qpidclientd.lib
- ${CMAKE_CURRENT_BINARY_DIR}/Debug/qpidclientd.pdb
- DESTINATION ${QPID_INSTALL_LIBDIR}
- COMPONENT ${QPID_COMPONENT_CLIENT})
-endif (WIN32)
if (WIN32)
set(AMQP_WCF_DIR ${qpid-cpp_SOURCE_DIR}/../wcf)
@@ -784,7 +766,8 @@ set (qpidbroker_SOURCES
qpid/broker/TxPublish.cpp
qpid/broker/Vhost.cpp
qpid/management/ManagementAgent.cpp
- qpid/management/ManagementExchange.cpp
+ qpid/management/ManagementDirectExchange.cpp
+ qpid/management/ManagementTopicExchange.cpp
qpid/sys/TCPIOPlugin.cpp
)
add_library (qpidbroker SHARED ${qpidbroker_SOURCES})
@@ -828,7 +811,7 @@ set (qmf_SOURCES
qpid/agent/ManagementAgentImpl.h
)
add_library (qmf SHARED ${qmf_SOURCES})
-target_link_libraries (qmf qmfengine)
+target_link_libraries (qmf qpidclient)
set_target_properties (qmf PROPERTIES
VERSION ${qmf_version})
install (TARGETS qmf OPTIONAL
@@ -922,15 +905,6 @@ set_target_properties (qmfconsole PROPER
install (TARGETS qmfconsole
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_QMF})
-# On Windows, also grab the debug version of qmfconsole.
-if (WIN32)
- install (PROGRAMS
- ${CMAKE_CURRENT_BINARY_DIR}/Debug/qmfconsoled.dll
- ${CMAKE_CURRENT_BINARY_DIR}/Debug/qmfconsoled.lib
- ${CMAKE_CURRENT_BINARY_DIR}/Debug/qmfconsoled.pdb
- DESTINATION ${QPID_INSTALL_LIBDIR}
- COMPONENT ${QPID_COMPONENT_QMF})
-endif (WIN32)
# A queue event listener plugin that creates messages on a replication
# queue corresponding to enqueue and dequeue events:
@@ -976,6 +950,5 @@ add_definitions(-DHAVE_CONFIG_H)
# Now create the config file from all the info learned above.
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake
${CMAKE_CURRENT_BINARY_DIR}/config.h)
-
add_subdirectory(qpid/store)
add_subdirectory(tests)
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/Makefile.am?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/Makefile.am Tue Mar 23 18:00:49 2010
@@ -465,6 +465,7 @@ libqpidcommon_la_SOURCES += \
qpid/sys/Runnable.cpp \
qpid/sys/ScopedIncrement.h \
qpid/sys/SecurityLayer.h \
+ qpid/sys/SecuritySettings.h \
qpid/sys/Semaphore.h \
qpid/sys/Shlib.cpp \
qpid/sys/Shlib.h \
@@ -637,8 +638,10 @@ libqpidbroker_la_SOURCES = \
qpid/management/IdAllocator.h \
qpid/management/ManagementAgent.cpp \
qpid/management/ManagementAgent.h \
- qpid/management/ManagementExchange.cpp \
- qpid/management/ManagementExchange.h \
+ qpid/management/ManagementDirectExchange.cpp \
+ qpid/management/ManagementDirectExchange.h \
+ qpid/management/ManagementTopicExchange.cpp \
+ qpid/management/ManagementTopicExchange.h \
qpid/sys/TCPIOPlugin.cpp
@@ -711,6 +714,7 @@ libqpidclient_la_SOURCES = \
qpid/messaging/Message.cpp \
qpid/messaging/MessageImpl.h \
qpid/messaging/MessageImpl.cpp \
+ qpid/messaging/PrivateImplRef.h \
qpid/messaging/Sender.cpp \
qpid/messaging/Receiver.cpp \
qpid/messaging/Session.cpp \
@@ -728,6 +732,7 @@ libqpidclient_la_SOURCES = \
qpid/client/amqp0_10/CodecsInternal.h \
qpid/client/amqp0_10/ConnectionImpl.h \
qpid/client/amqp0_10/ConnectionImpl.cpp \
+ qpid/client/amqp0_10/FailoverUpdates.cpp \
qpid/client/amqp0_10/IncomingMessages.h \
qpid/client/amqp0_10/IncomingMessages.cpp \
qpid/client/amqp0_10/MessageSink.h \
@@ -739,7 +744,9 @@ libqpidclient_la_SOURCES = \
qpid/client/amqp0_10/SessionImpl.h \
qpid/client/amqp0_10/SessionImpl.cpp \
qpid/client/amqp0_10/SenderImpl.h \
- qpid/client/amqp0_10/SenderImpl.cpp
+ qpid/client/amqp0_10/SenderImpl.cpp \
+ qpid/client/amqp0_10/SimpleUrlParser.h \
+ qpid/client/amqp0_10/SimpleUrlParser.cpp
# NOTE: only public header files (which should be in ../include)
# should go in this list. Private headers should go in the SOURCES
@@ -812,20 +819,23 @@ nobase_include_HEADERS += \
../include/qpid/sys/Thread.h \
../include/qpid/sys/Time.h \
../include/qpid/messaging/Address.h \
- ../include/qpid/messaging/Connection.h \
../include/qpid/messaging/Codec.h \
+ ../include/qpid/messaging/Connection.h \
../include/qpid/messaging/Duration.h \
+ ../include/qpid/messaging/Handle.h \
+ ../include/qpid/messaging/ImportExport.h \
../include/qpid/messaging/ListContent.h \
../include/qpid/messaging/ListView.h \
../include/qpid/messaging/MapContent.h \
../include/qpid/messaging/MapView.h \
../include/qpid/messaging/Message.h \
- ../include/qpid/messaging/Sender.h \
../include/qpid/messaging/Receiver.h \
+ ../include/qpid/messaging/Sender.h \
../include/qpid/messaging/Session.h \
../include/qpid/messaging/Uuid.h \
../include/qpid/messaging/Variant.h \
- ../include/qpid/client/amqp0_10/Codecs.h
+ ../include/qpid/client/amqp0_10/Codecs.h \
+ ../include/qpid/client/amqp0_10/FailoverUpdates.h
# Force build of qpidd during dist phase so help2man will work.
dist-hook: $(BUILT_SOURCES)
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qmf/engine/Agent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:919043-926606
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/Version.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/Version.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/Version.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/Version.h Tue Mar 23 18:00:49 2010
@@ -23,10 +23,11 @@
#ifdef HAVE_CONFIG_H
# include "config.h"
+#else
+# error "config.h not generated"
#endif
namespace qpid {
-#ifdef HAVE_CONFIG_H
const std::string product = PACKAGE_NAME;
const std::string version = PACKAGE_VERSION;
# if HAVE_SASL
@@ -34,11 +35,6 @@ namespace qpid {
# else
const std::string saslName = "qpidd-no-sasl";
# endif
-#else
- const std::string product = "qpidc";
- const std::string version = "0.7";
- const std::string saslName = "qpid-broker";
-#endif
}
#endif /*!QPID_VERSION_H*/
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Broker.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Broker.cpp Tue Mar 23 18:00:49 2010
@@ -35,7 +35,8 @@
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
-#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ManagementDirectExchange.h"
+#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ProtocolInitiation.h"
@@ -234,11 +235,22 @@ Broker::Broker(const Broker::Options& co
declareStandardExchange(amq_match, HeadersExchange::typeName);
if(conf.enableMgmt) {
- exchanges.declare(qpid_management, ManagementExchange::typeName);
- Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
- Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
+ exchanges.declare(qpid_management, ManagementTopicExchange::typeName);
+ Exchange::shared_ptr mExchange = exchanges.get(qpid_management);
+ Exchange::shared_ptr dExchange = exchanges.get(amq_direct);
managementAgent->setExchange(mExchange, dExchange);
- boost::dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent(managementAgent.get());
+ boost::dynamic_pointer_cast<ManagementTopicExchange>(mExchange)->setManagmentAgent(managementAgent.get(), 1);
+
+ std::string qmfTopic("qmf.default.topic");
+ std::string qmfDirect("qmf.default.direct");
+
+ std::pair<Exchange::shared_ptr, bool> topicPair(exchanges.declare(qmfTopic, ManagementTopicExchange::typeName));
+ std::pair<Exchange::shared_ptr, bool> directPair(exchanges.declare(qmfDirect, ManagementDirectExchange::typeName));
+
+ boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2);
+ boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2);
+
+ managementAgent->setExchangeV2(topicPair.first, directPair.first);
}
else
QPID_LOG(info, "Management not enabled");
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.cpp Tue Mar 23 18:00:49 2010
@@ -23,6 +23,7 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
+#include "qpid/sys/SecuritySettings.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -72,9 +73,10 @@ struct ConnectionTimeoutTask : public sy
}
};
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_, uint64_t objectId, bool shadow_) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_,
+ const qpid::sys::SecuritySettings& external, bool isLink_, uint64_t objectId, bool shadow_) :
ConnectionState(out_, broker_),
- ssf(ssf),
+ securitySettings(external),
adapter(*this, isLink_),
isLink(isLink_),
mgmtClosing(false),
@@ -99,7 +101,7 @@ Connection::Connection(ConnectionOutputH
if (agent != 0) {
mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
mgmtObject->set_shadow(shadow);
- agent->addObject(mgmtObject, objectId, true);
+ agent->addObject(mgmtObject, objectId);
}
ConnectionState::setUrl(mgmtId);
}
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Connection.h Tue Mar 23 18:00:49 2010
@@ -45,6 +45,7 @@
#include "qpid/sys/AggregateOutput.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/SecuritySettings.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/TimeoutHandler.h"
#include "qpid/sys/Mutex.h"
@@ -78,7 +79,8 @@ class Connection : public sys::Connectio
virtual void connectionError(const std::string&) = 0;
};
- Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, unsigned int ssf,
+ Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId,
+ const qpid::sys::SecuritySettings&,
bool isLink = false, uint64_t objectId = 0, bool shadow=false);
~Connection ();
@@ -136,14 +138,17 @@ class Connection : public sys::Connectio
// Used by cluster to update connection status
sys::AggregateOutput& getOutputTasks() { return outputTasks; }
- unsigned int getSSF() { return ssf; }
+ const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
+ {
+ return securitySettings;
+ }
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
ChannelMap channels;
- unsigned int ssf;
+ qpid::sys::SecuritySettings securitySettings;
ConnectionHandler adapter;
const bool isLink;
bool mgmtClosing;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Tue Mar 23 18:00:49 2010
@@ -22,12 +22,14 @@
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/amqp_0_10/Connection.h"
#include "qpid/broker/Connection.h"
+#include "qpid/sys/SecuritySettings.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
using framing::ProtocolVersion;
+using qpid::sys::SecuritySettings;
typedef std::auto_ptr<amqp_0_10::Connection> ConnectionPtr;
typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr;
@@ -37,7 +39,7 @@ ConnectionFactory::~ConnectionFactory()
sys::ConnectionCodec*
ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
- unsigned int ) {
+ const SecuritySettings& external) {
if (broker.getConnectionCounter().allowConnection())
{
QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused");
@@ -45,7 +47,7 @@ ConnectionFactory::create(ProtocolVersio
}
if (v == ProtocolVersion(0, 10)) {
ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
- c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, false)));
+ c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, false)));
return c.release();
}
return 0;
@@ -53,10 +55,10 @@ ConnectionFactory::create(ProtocolVersio
sys::ConnectionCodec*
ConnectionFactory::create(sys::OutputControl& out, const std::string& id,
- unsigned int) {
+ const SecuritySettings& external) {
// used to create connections from one broker to another
ConnectionPtr c(new amqp_0_10::Connection(out, id, true));
- c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, true)));
+ c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, external, true)));
return c.release();
}
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ConnectionFactory.h Tue Mar 23 18:00:49 2010
@@ -36,11 +36,10 @@ class ConnectionFactory : public sys::Co
sys::ConnectionCodec*
create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id,
- unsigned int conn_ssf);
+ const qpid::sys::SecuritySettings&);
sys::ConnectionCodec*
- create(sys::OutputControl&, const std::string& id,
- unsigned int conn_ssf);
+ create(sys::OutputControl&, const std::string& id, const qpid::sys::SecuritySettings&);
private:
Broker& broker;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Tue Mar 23 18:00:49 2010
@@ -24,7 +24,8 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/TopicExchange.h"
-#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ManagementDirectExchange.h"
+#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/framing/reply_exceptions.h"
using namespace qpid::broker;
@@ -52,8 +53,10 @@ pair<Exchange::shared_ptr, bool> Exchang
exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
}else if (type == HeadersExchange::typeName) {
exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
- }else if (type == ManagementExchange::typeName) {
- exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent, broker));
+ }else if (type == ManagementDirectExchange::typeName) {
+ exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
+ }else if (type == ManagementTopicExchange::typeName) {
+ exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
}else{
FunctionMap::iterator i = factory.find(type);
if (i == factory.end()) {
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Tue Mar 23 18:00:49 2010
@@ -252,7 +252,7 @@ namespace
{
bool match_values(const FieldValue& bind, const FieldValue& msg) {
- return bind.empty() || bind == msg;
+ return bind.getType() == 0xf0 || bind == msg;
}
}
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp Tue Mar 23 18:00:49 2010
@@ -30,7 +30,8 @@ IncompleteMessageList::IncompleteMessage
IncompleteMessageList::~IncompleteMessageList()
{
- sys::Mutex::ScopedLock l(lock);
+ // No lock here. We are relying on Messsag::reset*CompleteCallback
+ // to ensure no callbacks are in progress before they return.
for (Messages::iterator i = incomplete.begin(); i != incomplete.end(); ++i) {
(*i)->resetEnqueueCompleteCallback();
(*i)->resetDequeueCompleteCallback();
@@ -78,7 +79,7 @@ void IncompleteMessageList::each(const C
sys::Mutex::ScopedLock l(lock);
snapshot = incomplete;
}
- std::for_each(incomplete.begin(), incomplete.end(), listen); // FIXME aconway 2008-11-07: passed by ref or value?
+ std::for_each(incomplete.begin(), incomplete.end(), listen);
}
}}
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.cpp Tue Mar 23 18:00:49 2010
@@ -49,7 +49,8 @@ TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {}
+ expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0),
+ inCallback(false), requiredCredit(0) {}
Message::~Message()
{
@@ -398,35 +399,55 @@ void Message::setReplacementMessage(boos
replacement[qfor] = msg;
}
+namespace {
+struct ScopedSet {
+ sys::Monitor& lock;
+ bool& flag;
+ ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) {
+ sys::Monitor::ScopedLock sl(lock);
+ flag = true;
+ }
+ ~ScopedSet(){
+ sys::Monitor::ScopedLock sl(lock);
+ flag = false;
+ lock.notifyAll();
+ }
+};
+}
+
void Message::allEnqueuesComplete() {
- sys::Mutex::ScopedLock l(callbackLock);
+ ScopedSet ss(callbackLock, inCallback);
MessageCallback* cb = enqueueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
void Message::allDequeuesComplete() {
- sys::Mutex::ScopedLock l(callbackLock);
+ ScopedSet ss(callbackLock, inCallback);
MessageCallback* cb = dequeueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
enqueueCallback = &cb;
}
void Message::resetEnqueueCompleteCallback() {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
enqueueCallback = 0;
}
void Message::setDequeueCompleteCallback(MessageCallback& cb) {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
dequeueCallback = &cb;
}
void Message::resetDequeueCompleteCallback() {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
dequeueCallback = 0;
}
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Message.h Tue Mar 23 18:00:49 2010
@@ -26,7 +26,7 @@
#include "qpid/broker/PersistableMessage.h"
#include "qpid/broker/MessageAdapter.h"
#include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
@@ -189,9 +189,10 @@ public:
mutable Replacement replacement;
mutable boost::intrusive_ptr<Message> empty;
- sys::Mutex callbackLock;
+ sys::Monitor callbackLock;
MessageCallback* enqueueCallback;
MessageCallback* dequeueCallback;
+ bool inCallback;
uint32_t requiredCredit;
};
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp Tue Mar 23 18:00:49 2010
@@ -26,6 +26,7 @@
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/SecuritySettings.h"
#include <boost/format.hpp>
#if HAVE_SASL
@@ -36,6 +37,7 @@ using qpid::sys::cyrus::CyrusSecurityLay
using namespace qpid::framing;
using qpid::sys::SecurityLayer;
+using qpid::sys::SecuritySettings;
using boost::format;
using boost::str;
@@ -152,7 +154,8 @@ void NullAuthenticator::start(const stri
#if HAVE_SASL
// encryption required - check to see if we are running over an
// encrypted SSL connection.
- sasl_ssf_t external_ssf = (sasl_ssf_t) connection.getSSF();
+ SecuritySettings external = connection.getExternalSecuritySettings();
+ sasl_ssf_t external_ssf = (sasl_ssf_t) external.ssf;
if (external_ssf < 1) // < 1 == unencrypted
#endif
{
@@ -244,7 +247,9 @@ void CyrusAuthenticator::init()
// If the transport provides encryption, notify the SASL library of
// the key length and set the ssf range to prevent double encryption.
- sasl_ssf_t external_ssf = (sasl_ssf_t) connection.getSSF();
+ SecuritySettings external = connection.getExternalSecuritySettings();
+ QPID_LOG(debug, "External ssf=" << external.ssf << " and auth=" << external.authid);
+ sasl_ssf_t external_ssf = (sasl_ssf_t) external.ssf;
if (external_ssf) {
int result = sasl_setprop(sasl_conn, SASL_SSF_EXTERNAL, &external_ssf);
if (result != SASL_OK) {
@@ -258,16 +263,25 @@ void CyrusAuthenticator::init()
", max_ssf: " << secprops.max_ssf <<
", external_ssf: " << external_ssf );
+ if (!external.authid.empty()) {
+ const char* external_authid = external.authid.c_str();
+ int result = sasl_setprop(sasl_conn, SASL_AUTH_EXTERNAL, external_authid);
+ if (result != SASL_OK) {
+ throw framing::InternalErrorException(QPID_MSG("SASL error: unable to set external auth: " << result));
+ }
+
+ QPID_LOG(debug, "external auth detected and set to " << external_authid);
+ }
+
secprops.maxbufsize = 65535;
secprops.property_names = 0;
secprops.property_values = 0;
secprops.security_flags = 0; /* or SASL_SEC_NOANONYMOUS etc as appropriate */
-
+ if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY;
int result = sasl_setprop(sasl_conn, SASL_SEC_PROPS, &secprops);
if (result != SASL_OK) {
throw framing::InternalErrorException(QPID_MSG("SASL error: " << result));
}
-
}
CyrusAuthenticator::~CyrusAuthenticator()
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp Tue Mar 23 18:00:49 2010
@@ -23,12 +23,14 @@
#include "qpid/amqp_0_10/Connection.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/SecureConnection.h"
+#include "qpid/sys/SecuritySettings.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
using framing::ProtocolVersion;
+using qpid::sys::SecuritySettings;
typedef std::auto_ptr<amqp_0_10::Connection> CodecPtr;
typedef std::auto_ptr<SecureConnection> SecureConnectionPtr;
typedef std::auto_ptr<Connection> ConnectionPtr;
@@ -38,7 +40,7 @@ SecureConnectionFactory::SecureConnectio
sys::ConnectionCodec*
SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
- unsigned int conn_ssf ) {
+ const SecuritySettings& external) {
if (broker.getConnectionCounter().allowConnection())
{
QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused");
@@ -47,7 +49,7 @@ SecureConnectionFactory::create(Protocol
if (v == ProtocolVersion(0, 10)) {
SecureConnectionPtr sc(new SecureConnection());
CodecPtr c(new amqp_0_10::Connection(out, id, false));
- ConnectionPtr i(new broker::Connection(c.get(), broker, id, conn_ssf, false));
+ ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, false));
i->setSecureConnection(sc.get());
c->setInputHandler(InputPtr(i.release()));
sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));
@@ -58,11 +60,11 @@ SecureConnectionFactory::create(Protocol
sys::ConnectionCodec*
SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id,
- unsigned int conn_ssf) {
+ const SecuritySettings& external) {
// used to create connections from one broker to another
SecureConnectionPtr sc(new SecureConnection());
CodecPtr c(new amqp_0_10::Connection(out, id, true));
- ConnectionPtr i(new broker::Connection(c.get(), broker, id, conn_ssf, true ));
+ ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, true ));
i->setSecureConnection(sc.get());
c->setInputHandler(InputPtr(i.release()));
sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h Tue Mar 23 18:00:49 2010
@@ -34,11 +34,10 @@ class SecureConnectionFactory : public s
sys::ConnectionCodec*
create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id,
- unsigned int conn_ssf);
+ const qpid::sys::SecuritySettings&);
sys::ConnectionCodec*
- create(sys::OutputControl&, const std::string& id,
- unsigned int conn_ssf);
+ create(sys::OutputControl&, const std::string& id, const qpid::sys::SecuritySettings&);
private:
Broker& broker;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Tue Mar 23 18:00:49 2010
@@ -213,7 +213,7 @@ void ConnectionHandler::start(const Fiel
if (sasl.get()) {
string response = sasl->start(mechanism.empty() ? mechlist : mechanism,
- getSSF ? getSSF() : 0);
+ getSecuritySettings ? getSecuritySettings() : 0);
proxy.startOk(properties, sasl->getMechanism(), response, locale);
} else {
//TODO: verify that desired mechanism and locale are supported
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionHandler.h Tue Mar 23 18:00:49 2010
@@ -40,6 +40,11 @@
#include <memory>
namespace qpid {
+
+namespace sys {
+struct SecuritySettings;
+}
+
namespace client {
class ConnectionHandler : private StateManager,
@@ -95,7 +100,7 @@ public:
using InputHandler::handle;
typedef boost::function<void()> CloseListener;
typedef boost::function<void(uint16_t, const std::string&)> ErrorListener;
- typedef boost::function<unsigned int()> GetConnSSF;
+ typedef boost::function<const qpid::sys::SecuritySettings*()> GetSecuritySettings;
ConnectionHandler(const ConnectionSettings&, framing::ProtocolVersion&);
@@ -123,7 +128,7 @@ public:
static framing::connection::CloseCode convert(uint16_t replyCode);
const std::string& getUserId() const { return operUserId; }
- GetConnSSF getSSF; /** query the connection for its security strength factor */
+ GetSecuritySettings getSecuritySettings; /** query the transport for its security details */
};
}}
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Tue Mar 23 18:00:49 2010
@@ -134,6 +134,15 @@ IOThread& theIO() {
return io;
}
+// Bring theIO into existence on library load rather than first use.
+// This avoids it being destroyed whilst something in the main program
+// still exists
+struct InitAtLoad {
+ InitAtLoad() {
+ (void) theIO();
+ }
+} init;
+
class HeartbeatTask : public TimerTask {
TimeoutHandler& timeout;
@@ -165,7 +174,7 @@ ConnectionImpl::ConnectionImpl(framing::
CLOSE_CODE_NORMAL, std::string());
//only set error handler once open
handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
- handler.getSSF = boost::bind(&Connector::getSSF, boost::ref(connector));
+ handler.getSecuritySettings = boost::bind(&Connector::getSecuritySettings, boost::ref(connector));
}
const uint16_t ConnectionImpl::NEXT_CHANNEL = std::numeric_limits<uint16_t>::max();
@@ -195,7 +204,8 @@ void ConnectionImpl::addSession(const bo
throw SessionBusyException(QPID_MSG("Channel " << ss->getChannel() << " attached to " << ss->getId()));
} //else channel is busy, but we can keep looking for a free one
}
-
+ // If we get here, we didn't find any available channel.
+ throw ResourceLimitExceededException("There are no channels available");
}
void ConnectionImpl::handle(framing::AMQFrame& frame)
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Connector.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Connector.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Connector.h Tue Mar 23 18:00:49 2010
@@ -35,6 +35,7 @@ namespace sys {
class ShutdownHandler;
class SecurityLayer;
class Poller;
+struct SecuritySettings;
}
namespace framing {
@@ -74,7 +75,7 @@ class Connector : public framing::Output
virtual void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
- virtual unsigned int getSSF() = 0;
+ virtual const qpid::sys::SecuritySettings* getSecuritySettings() = 0;
};
}}
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/RdmaConnector.cpp Tue Mar 23 18:00:49 2010
@@ -109,7 +109,7 @@ class RdmaConnector : public Connector,
framing::OutputHandler* getOutputHandler();
const std::string& getIdentifier() const;
void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
- unsigned int getSSF() { return 0; }
+ const qpid::sys::SecuritySettings* getSecuritySettings() { return 0; }
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Sasl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Sasl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Sasl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/Sasl.h Tue Mar 23 18:00:49 2010
@@ -30,6 +30,7 @@ namespace qpid {
namespace sys {
class SecurityLayer;
+struct SecuritySettings;
}
namespace client {
@@ -48,17 +49,10 @@ class Sasl
*
* @param mechanisms Comma-separated list of the SASL mechanism the
* client supports.
- * @param ssf Security Strength Factor (SSF). SSF is used to negotiate
- * a SASL security layer on top of the connection should both
- * parties require and support it. The value indicates the
- * required level of security for communication. Possible
- * values are:
- * @li 0 No security
- * @li 1 Integrity checking only
- * @li >1 Integrity and confidentiality with the number
- * giving the encryption key length.
+ * @param externalSecuritySettings security related details from the underlying transport
*/
- virtual std::string start(const std::string& mechanisms, unsigned int ssf) = 0;
+ virtual std::string start(const std::string& mechanisms,
+ const qpid::sys::SecuritySettings* externalSecuritySettings = 0) = 0;
virtual std::string step(const std::string& challenge) = 0;
virtual std::string getMechanism() = 0;
virtual std::string getUserId() = 0;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SaslFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SaslFactory.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SaslFactory.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SaslFactory.cpp Tue Mar 23 18:00:49 2010
@@ -61,6 +61,7 @@ std::auto_ptr<SaslFactory> SaslFactory::
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/sys/SecurityLayer.h"
+#include "qpid/sys/SecuritySettings.h"
#include "qpid/sys/cyrus/CyrusSecurityLayer.h"
#include "qpid/log/Statement.h"
#include <sasl/sasl.h>
@@ -70,6 +71,7 @@ namespace qpid {
namespace client {
using qpid::sys::SecurityLayer;
+using qpid::sys::SecuritySettings;
using qpid::sys::cyrus::CyrusSecurityLayer;
using qpid::framing::InternalErrorException;
@@ -80,7 +82,7 @@ class CyrusSasl : public Sasl
public:
CyrusSasl(const ConnectionSettings&);
~CyrusSasl();
- std::string start(const std::string& mechanisms, unsigned int ssf);
+ std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings);
std::string step(const std::string& challenge);
std::string getMechanism();
std::string getUserId();
@@ -176,7 +178,7 @@ namespace {
const std::string SSL("ssl");
}
-std::string CyrusSasl::start(const std::string& mechanisms, unsigned int ssf)
+std::string CyrusSasl::start(const std::string& mechanisms, const SecuritySettings* externalSettings)
{
QPID_LOG(debug, "CyrusSasl::start(" << mechanisms << ")");
int result = sasl_client_new(settings.service.c_str(),
@@ -190,14 +192,22 @@ std::string CyrusSasl::start(const std::
sasl_security_properties_t secprops;
- if (ssf) {
- sasl_ssf_t external_ssf = (sasl_ssf_t) ssf;
+ if (externalSettings) {
+ sasl_ssf_t external_ssf = (sasl_ssf_t) externalSettings->ssf;
if (external_ssf) {
int result = sasl_setprop(conn, SASL_SSF_EXTERNAL, &external_ssf);
if (result != SASL_OK) {
throw framing::InternalErrorException(QPID_MSG("SASL error: unable to set external SSF: " << result));
}
- QPID_LOG(debug, "external SSF detected and set to " << ssf);
+ QPID_LOG(debug, "external SSF detected and set to " << external_ssf);
+ }
+ if (externalSettings->authid.size()) {
+ const char* external_authid = externalSettings->authid.c_str();
+ result = sasl_setprop(conn, SASL_AUTH_EXTERNAL, external_authid);
+ if (result != SASL_OK) {
+ throw framing::InternalErrorException(QPID_MSG("SASL error: unable to set external auth: " << result));
+ }
+ QPID_LOG(debug, "external auth detected and set to " << external_authid);
}
}
@@ -216,7 +226,6 @@ std::string CyrusSasl::start(const std::
throw framing::InternalErrorException(QPID_MSG("SASL error: " << sasl_errdetail(conn)));
}
-
sasl_interact_t* client_interact = 0;
const char *out = 0;
unsigned outlen = 0;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SslConnector.cpp Tue Mar 23 18:00:49 2010
@@ -34,6 +34,7 @@
#include "qpid/sys/ssl/SslSocket.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
+#include "qpid/sys/SecuritySettings.h"
#include "qpid/Msg.h"
#include <iostream>
@@ -86,6 +87,7 @@ class SslConnector : public Connector
const uint16_t maxFrameSize;
framing::ProtocolVersion version;
bool initiated;
+ SecuritySettings securitySettings;
sys::Mutex closedLock;
bool closed;
@@ -125,7 +127,7 @@ class SslConnector : public Connector
sys::ShutdownHandler* getShutdownHandler() const;
framing::OutputHandler* getOutputHandler();
const std::string& getIdentifier() const;
- unsigned int getSSF() { return socket.getKeyLen(); }
+ const SecuritySettings* getSecuritySettings();
public:
SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion,
@@ -366,4 +368,11 @@ void SslConnector::eof(SslIO&) {
handleClosed();
}
+const SecuritySettings* SslConnector::getSecuritySettings()
+{
+ securitySettings.ssf = socket.getKeyLen();
+ securitySettings.authid = "dummy";//set to non-empty string to enable external authentication
+ return &securitySettings;
+}
+
}} // namespace qpid::client
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.cpp Tue Mar 23 18:00:49 2010
@@ -199,8 +199,19 @@ void TCPConnector::send(AMQFrame& frame)
} else {
notifyWrite = (currentSize >= maxFrameSize);
}
- }
+ /*
+ NOTE: Moving the following line into this mutex block
+ is a workaround for BZ 570168, in which the test
+ testConcurrentSenders causes a hang about 1.5%
+ of the time. ( To see the hang much more frequently
+ leave this line out of the mutex block, and put a
+ small usleep just before it.)
+
+ TODO mgoulish - fix the underlying cause and then
+ move this call back outside the mutex.
+ */
if (notifyWrite && !closed) aio->notifyPendingWrite();
+ }
}
void TCPConnector::handleClosed() {
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/TCPConnector.h Tue Mar 23 18:00:49 2010
@@ -92,7 +92,7 @@ class TCPConnector : public Connector, p
framing::OutputHandler* getOutputHandler();
const std::string& getIdentifier() const;
void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
- unsigned int getSSF() { return 0; }
+ const qpid::sys::SecuritySettings* getSecuritySettings() { return 0; }
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp Tue Mar 23 18:00:49 2010
@@ -25,8 +25,10 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/List.h"
+#include "qpid/log/Statement.h"
#include <algorithm>
#include <functional>
+#include <limits>
using namespace qpid::framing;
using namespace qpid::messaging;
@@ -39,6 +41,7 @@ namespace {
const std::string iso885915("iso-8859-15");
const std::string utf8("utf8");
const std::string utf16("utf16");
+const std::string binary("binary");
const std::string amqp0_10_binary("amqp0-10:binary");
const std::string amqp0_10_bit("amqp0-10:bit");
const std::string amqp0_10_datetime("amqp0-10:datetime");
@@ -129,7 +132,7 @@ Variant toVariant(boost::shared_ptr<Fiel
case 0x02: out = in->getIntegerValue<int8_t, 1>(); break;
case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break;
case 0x04: break; //TODO: iso-8859-15 char
- case 0x08: out = in->getIntegerValue<bool, 1>(); break;
+ case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break;
case 0x10: out.setEncoding(amqp0_10_binary);
case 0x11: out = in->getIntegerValue<int16_t, 2>(); break;
case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break;
@@ -163,8 +166,8 @@ Variant toVariant(boost::shared_ptr<Fiel
case 0x96:
case 0xa0:
case 0xab:
- setEncodingFor(out, in->getType());
out = in->get<std::string>();
+ setEncodingFor(out, in->getType());
break;
case 0xa8:
@@ -188,6 +191,28 @@ Variant toVariant(boost::shared_ptr<Fiel
return out;
}
+boost::shared_ptr<FieldValue> convertString(const std::string& value, const std::string& encoding)
+{
+ bool large = value.size() > std::numeric_limits<uint16_t>::max();
+ if (encoding.empty() || encoding == amqp0_10_binary || encoding == binary) {
+ if (large) {
+ return boost::shared_ptr<FieldValue>(new Var32Value(value, 0xa0));
+ } else {
+ return boost::shared_ptr<FieldValue>(new Var16Value(value, 0x90));
+ }
+ } else if (encoding == utf8 && !large) {
+ return boost::shared_ptr<FieldValue>(new Str16Value(value));
+ } else if (encoding == utf16 && !large) {
+ return boost::shared_ptr<FieldValue>(new Var16Value(value, 0x96));
+ } else if (encoding == iso885915 && !large) {
+ return boost::shared_ptr<FieldValue>(new Var16Value(value, 0x94));
+ } else {
+ //either the string is too large for the encoding in amqp 0-10, or the encoding was not recognised
+ QPID_LOG(warning, "Could not encode " << value.size() << " byte value as " << encoding << ", encoding as vbin32.");
+ return boost::shared_ptr<FieldValue>(new Var32Value(value, 0xa0));
+ }
+}
+
boost::shared_ptr<FieldValue> toFieldValue(const Variant& in)
{
boost::shared_ptr<FieldValue> out;
@@ -204,8 +229,7 @@ boost::shared_ptr<FieldValue> toFieldVal
case VAR_INT64: out = boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); break;
case VAR_FLOAT: out = boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); break;
case VAR_DOUBLE: out = boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); break;
- //TODO: check encoding (and length?) when deciding what AMQP type to treat string as
- case VAR_STRING: out = boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); break;
+ case VAR_STRING: out = convertString(in.asString(), in.getEncoding()); break;
case VAR_UUID: out = boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data())); break;
case VAR_MAP:
out = boost::shared_ptr<FieldValue>(toFieldTableValue(in.asMap()));
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Tue Mar 23 18:00:49 2010
@@ -1,4 +1,4 @@
-/*
+ /*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -20,8 +20,9 @@
*/
#include "ConnectionImpl.h"
#include "SessionImpl.h"
+#include "SimpleUrlParser.h"
#include "qpid/messaging/Session.h"
-#include "qpid/client/PrivateImplRef.h"
+#include "qpid/messaging/PrivateImplRef.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include <boost/intrusive_ptr.hpp>
@@ -33,13 +34,42 @@ namespace amqp0_10 {
using qpid::messaging::Variant;
using qpid::framing::Uuid;
-using namespace qpid::sys;
-template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value)
+void convert(const Variant::List& from, std::vector<std::string>& to)
+{
+ for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
+ to.push_back(i->asString());
+ }
+}
+
+template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value)
{
Variant::Map::const_iterator i = map.find(key);
if (i != map.end()) {
value = (T) i->second;
+ QPID_LOG(debug, "option " << key << " specified as " << i->second);
+ return true;
+ } else {
+ QPID_LOG(debug, "option " << key << " not specified");
+ return false;
+ }
+}
+
+template <>
+bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
+ const std::string& key,
+ std::vector<std::string>& value)
+{
+ Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ if (i->second.getType() == qpid::messaging::VAR_LIST) {
+ convert(i->second.asList(), value);
+ } else {
+ value.push_back(i->second.asString());
+ }
+ return true;
+ } else {
+ return false;
}
}
@@ -59,24 +89,47 @@ void convert(const Variant::Map& from, C
setIfFound(from, "max-channels", to.maxChannels);
setIfFound(from, "max-frame-size", to.maxFrameSize);
setIfFound(from, "bounds", to.bounds);
+
+ setIfFound(from, "protocol", to.protocol);
}
ConnectionImpl::ConnectionImpl(const Variant::Map& options) :
- reconnectionEnabled(true), timeout(-1),
- minRetryInterval(1), maxRetryInterval(30)
+ reconnect(true), timeout(-1), limit(-1),
+ minReconnectInterval(3), maxReconnectInterval(60),
+ retries(0)
+{
+ QPID_LOG(debug, "Created connection with " << options);
+ setOptions(options);
+}
+
+void ConnectionImpl::setOptions(const Variant::Map& options)
{
- QPID_LOG(debug, "Opening connection to " << url << " with " << options);
convert(options, settings);
- setIfFound(options, "reconnection-enabled", reconnectionEnabled);
- setIfFound(options, "reconnection-timeout", timeout);
- setIfFound(options, "min-retry-interval", minRetryInterval);
- setIfFound(options, "max-retry-interval", maxRetryInterval);
+ setIfFound(options, "reconnect", reconnect);
+ setIfFound(options, "reconnect-timeout", timeout);
+ setIfFound(options, "reconnect-limit", limit);
+ int64_t reconnectInterval;
+ if (setIfFound(options, "reconnect-interval", reconnectInterval)) {
+ minReconnectInterval = maxReconnectInterval = reconnectInterval;
+ } else {
+ setIfFound(options, "min-reconnect-interval", minReconnectInterval);
+ setIfFound(options, "max-reconnect-interval", maxReconnectInterval);
+ }
+ setIfFound(options, "urls", urls);
+}
+
+void ConnectionImpl::setOption(const std::string& name, const Variant& value)
+{
+ Variant::Map options;
+ options[name] = value;
+ setOptions(options);
+ QPID_LOG(debug, "Set " << name << " to " << value);
}
void ConnectionImpl::open(const std::string& u)
{
- url = u;
- connection.open(url, settings);
+ urls.push_back(u);
+ connect();
}
void ConnectionImpl::close()
@@ -97,7 +150,7 @@ void ConnectionImpl::close()
boost::intrusive_ptr<SessionImpl> getImplPtr(qpid::messaging::Session& session)
{
return boost::dynamic_pointer_cast<SessionImpl>(
- qpid::client::PrivateImplRef<qpid::messaging::Session>::get(session)
+ qpid::messaging::PrivateImplRef<qpid::messaging::Session>::get(session)
);
}
@@ -134,64 +187,65 @@ qpid::messaging::Session ConnectionImpl:
try {
getImplPtr(impl)->setSession(connection.newSession(name));
} catch (const TransportFailure&) {
- reconnect();
+ connect();
}
return impl;
}
-void ConnectionImpl::reconnect()
+void ConnectionImpl::connect()
{
- AbsTime start = now();
- ScopedLock<Semaphore> l(semaphore);
+ qpid::sys::AbsTime start = qpid::sys::now();
+ qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore);
if (!connection.isOpen()) connect(start);
}
-bool expired(const AbsTime& start, int timeout)
+bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
{
if (timeout == 0) return true;
if (timeout < 0) return false;
- Duration used(start, now());
- Duration allowed = timeout * TIME_SEC;
- return allowed > used;
+ qpid::sys::Duration used(start, qpid::sys::now());
+ qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC;
+ return allowed < used;
}
-void ConnectionImpl::connect(const AbsTime& started)
+void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
{
- for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) {
- if (expired(started, timeout)) throw TransportFailure();
+ for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) {
+ if (!reconnect) throw TransportFailure("Failed to connect (reconnect disabled)");
+ if (limit >= 0 && retries++ >= limit) throw TransportFailure("Failed to connect within reconnect limit");
+ if (expired(started, timeout)) throw TransportFailure("Failed to connect within reconnect timeout");
else qpid::sys::sleep(i);
}
+ retries = 0;
}
bool ConnectionImpl::tryConnect()
{
- if (tryConnect(url) ||
- (failoverListener.get() && tryConnect(failoverListener->getKnownBrokers())))
- {
- return resetSessions();
- } else {
- return false;
- }
+ if (tryConnect(urls)) return resetSessions();
+ else return false;
}
-bool ConnectionImpl::tryConnect(const Url& u)
+bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls)
{
- try {
- QPID_LOG(info, "Trying to connect to " << url << "...");
- connection.open(u, settings);
- failoverListener.reset(new FailoverListener(connection));
- return true;
- } catch (const Exception& e) {
- //TODO: need to fix timeout on open so that it throws TransportFailure
- QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());
- }
- return false;
-}
-
-bool ConnectionImpl::tryConnect(const std::vector<Url>& urls)
-{
- for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
- if (tryConnect(*i)) return true;
+ for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
+ try {
+ QPID_LOG(info, "Trying to connect to " << *i << "...");
+ //TODO: when url support is more complete can avoid this test here
+ if (i->find("amqp:") == 0) {
+ Url url(*i);
+ connection.open(url, settings);
+ } else {
+ SimpleUrlParser::parse(*i, settings);
+ connection.open(settings);
+ }
+ QPID_LOG(info, "Connected to " << *i);
+ return true;
+ } catch (const Exception& e) {
+ //TODO: need to fix timeout on
+ //qpid::client::Connection::open() so that it throws
+ //TransportFailure rather than a ConnectionException
+ QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
+ }
}
return false;
}
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Tue Mar 23 18:00:49 2010
@@ -25,7 +25,6 @@
#include "qpid/messaging/Variant.h"
#include "qpid/Url.h"
#include "qpid/client/Connection.h"
-#include "qpid/client/FailoverListener.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Semaphore.h"
@@ -46,7 +45,8 @@ class ConnectionImpl : public qpid::mess
qpid::messaging::Session newSession(bool transactional, const std::string& name);
qpid::messaging::Session getSession(const std::string& name) const;
void closed(SessionImpl&);
- void reconnect();
+ void connect();
+ void setOption(const std::string& name, const qpid::messaging::Variant& value);
private:
typedef std::map<std::string, qpid::messaging::Session> Sessions;
@@ -54,18 +54,19 @@ class ConnectionImpl : public qpid::mess
qpid::sys::Semaphore semaphore;//used to coordinate reconnection
Sessions sessions;
qpid::client::Connection connection;
- std::auto_ptr<FailoverListener> failoverListener;
- qpid::Url url;
+ std::vector<std::string> urls;
qpid::client::ConnectionSettings settings;
- bool reconnectionEnabled;
- int timeout;
- int minRetryInterval;
- int maxRetryInterval;
+ bool reconnect;
+ int64_t timeout;
+ int32_t limit;
+ int64_t minReconnectInterval;
+ int64_t maxReconnectInterval;
+ int32_t retries;
+ void setOptions(const qpid::messaging::Variant::Map& options);
void connect(const qpid::sys::AbsTime& started);
bool tryConnect();
- bool tryConnect(const std::vector<Url>& urls);
- bool tryConnect(const Url&);
+ bool tryConnect(const std::vector<std::string>& urls);
bool resetSessions();
};
}}} // namespace qpid::client::amqp0_10
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Tue Mar 23 18:00:49 2010
@@ -57,14 +57,14 @@ qpid::messaging::Message ReceiverImpl::f
bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
Get f(*this, message, timeout);
- while (!parent.execute(f)) {}
+ while (!parent->execute(f)) {}
return f.result;
}
bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
Fetch f(*this, message, timeout);
- while (!parent.execute(f)) {}
+ while (!parent->execute(f)) {}
return f.result;
}
@@ -112,7 +112,7 @@ void ReceiverImpl::init(qpid::client::As
}
if (state == CANCELLED) {
source->cancel(session, destination);
- parent.receiverCancelled(destination);
+ parent->receiverCancelled(destination);
} else {
source->subscribe(session, destination);
start();
@@ -129,23 +129,23 @@ uint32_t ReceiverImpl::getCapacity()
uint32_t ReceiverImpl::available()
{
- return parent.available(destination);
+ return parent->available(destination);
}
uint32_t ReceiverImpl::pendingAck()
{
- return parent.pendingAck(destination);
+ return parent->pendingAck(destination);
}
ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
const qpid::messaging::Address& a) :
- parent(p), destination(name), address(a), byteCredit(0xFFFFFFFF),
+ parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF),
state(UNRESOLVED), capacity(0), window(0) {}
bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
- return parent.get(*this, message, timeout);
+ return parent->get(*this, message, timeout);
}
bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
@@ -172,7 +172,7 @@ void ReceiverImpl::closeImpl()
if (state != CANCELLED) {
state = CANCELLED;
source->cancel(session, destination);
- parent.receiverCancelled(destination);
+ parent->receiverCancelled(destination);
}
}
@@ -188,7 +188,7 @@ void ReceiverImpl::setCapacityImpl(uint3
}
qpid::messaging::Session ReceiverImpl::getSession() const
{
- return qpid::messaging::Session(&parent);
+ return qpid::messaging::Session(parent.get());
}
}}} // namespace qpid::client::amqp0_10
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Tue Mar 23 18:00:49 2010
@@ -28,6 +28,7 @@
#include "qpid/client/AsyncSession.h"
#include "qpid/client/amqp0_10/SessionImpl.h"
#include "qpid/messaging/Duration.h"
+#include <boost/intrusive_ptr.hpp>
#include <memory>
namespace qpid {
@@ -65,7 +66,7 @@ class ReceiverImpl : public qpid::messag
void received(qpid::messaging::Message& message);
qpid::messaging::Session getSession() const;
private:
- SessionImpl& parent;
+ boost::intrusive_ptr<SessionImpl> parent;
const std::string destination;
const qpid::messaging::Address address;
const uint32_t byteCredit;
@@ -133,13 +134,13 @@ class ReceiverImpl : public qpid::messag
template <class F> void execute()
{
F f(*this);
- parent.execute(f);
+ parent->execute(f);
}
template <class F, class P> void execute1(P p)
{
F f(*this, p);
- parent.execute(f);
+ parent->execute(f);
}
};
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Tue Mar 23 18:00:49 2010
@@ -31,17 +31,17 @@ namespace amqp0_10 {
SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
const qpid::messaging::Address& _address) :
- parent(_parent), name(_name), address(_address), state(UNRESOLVED),
+ parent(&_parent), name(_name), address(_address), state(UNRESOLVED),
capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {}
void SenderImpl::send(const qpid::messaging::Message& message)
{
if (unreliable) {
UnreliableSend f(*this, &message);
- parent.execute(f);
+ parent->execute(f);
} else {
Send f(*this, &message);
- while (f.repeat) parent.execute(f);
+ while (f.repeat) parent->execute(f);
}
}
@@ -60,7 +60,7 @@ uint32_t SenderImpl::getCapacity() { ret
uint32_t SenderImpl::pending()
{
CheckPendingSends f(*this, false);
- parent.execute(f);
+ parent->execute(f);
return f.pending;
}
@@ -73,7 +73,7 @@ void SenderImpl::init(qpid::client::Asyn
}
if (state == CANCELLED) {
sink->cancel(session, name);
- parent.senderCancelled(name);
+ parent->senderCancelled(name);
} else {
sink->declare(session, name);
replay();
@@ -140,7 +140,7 @@ void SenderImpl::closeImpl()
{
state = CANCELLED;
sink->cancel(session, name);
- parent.senderCancelled(name);
+ parent->senderCancelled(name);
}
const std::string& SenderImpl::getName() const
@@ -150,7 +150,7 @@ const std::string& SenderImpl::getName()
qpid::messaging::Session SenderImpl::getSession() const
{
- return qpid::messaging::Session(&parent);
+ return qpid::messaging::Session(parent.get());
}
}}} // namespace qpid::client::amqp0_10
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Tue Mar 23 18:00:49 2010
@@ -28,6 +28,7 @@
#include "qpid/client/AsyncSession.h"
#include "qpid/client/amqp0_10/SessionImpl.h"
#include <memory>
+#include <boost/intrusive_ptr.hpp>
#include <boost/ptr_container/ptr_deque.hpp>
namespace qpid {
@@ -58,7 +59,7 @@ class SenderImpl : public qpid::messagin
qpid::messaging::Session getSession() const;
private:
- SessionImpl& parent;
+ boost::intrusive_ptr<SessionImpl> parent;
const std::string name;
const qpid::messaging::Address address;
State state;
@@ -143,13 +144,13 @@ class SenderImpl : public qpid::messagin
template <class F> void execute()
{
F f(*this);
- parent.execute(f);
+ parent->execute(f);
}
template <class F, class P> bool execute1(P p)
{
F f(*this, p);
- return parent.execute(f);
+ return parent->execute(f);
}
};
}}} // namespace qpid::client::amqp0_10
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Tue Mar 23 18:00:49 2010
@@ -24,7 +24,7 @@
#include "qpid/client/amqp0_10/SenderImpl.h"
#include "qpid/client/amqp0_10/MessageSource.h"
#include "qpid/client/amqp0_10/MessageSink.h"
-#include "qpid/client/PrivateImplRef.h"
+#include "qpid/messaging/PrivateImplRef.h"
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
#include "qpid/messaging/Address.h"
@@ -49,7 +49,7 @@ namespace qpid {
namespace client {
namespace amqp0_10 {
-SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(c), transactional(t) {}
+SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {}
void SessionImpl::sync()
@@ -108,13 +108,13 @@ void SessionImpl::close()
for (std::vector<std::string>::const_iterator i = r.begin(); i != r.end(); ++i) getReceiver(*i).close();
- connection.closed(*this);
+ connection->closed(*this);
session.close();
}
template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t)
{
- return boost::dynamic_pointer_cast<S>(qpid::client::PrivateImplRef<T>::get(t));
+ return boost::dynamic_pointer_cast<S>(qpid::messaging::PrivateImplRef<T>::get(t));
}
template <class T> void getFreeKey(std::string& key, T& map)
@@ -431,12 +431,12 @@ void SessionImpl::senderCancelled(const
void SessionImpl::reconnect()
{
- connection.reconnect();
+ connection->connect();
}
qpid::messaging::Connection SessionImpl::getConnection() const
{
- return qpid::messaging::Connection(&connection);
+ return qpid::messaging::Connection(connection.get());
}
}}} // namespace qpid::client::amqp0_10
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Tue Mar 23 18:00:49 2010
@@ -29,6 +29,7 @@
#include "qpid/client/amqp0_10/AddressResolution.h"
#include "qpid/client/amqp0_10/IncomingMessages.h"
#include "qpid/sys/Mutex.h"
+#include <boost/intrusive_ptr.hpp>
namespace qpid {
@@ -106,7 +107,7 @@ class SessionImpl : public qpid::messagi
typedef std::map<std::string, qpid::messaging::Sender> Senders;
mutable qpid::sys::Mutex lock;
- ConnectionImpl& connection;
+ boost::intrusive_ptr<ConnectionImpl> connection;
qpid::client::Session session;
AddressResolution resolver;
IncomingMessages incoming;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp Tue Mar 23 18:00:49 2010
@@ -25,6 +25,7 @@
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/sys/SecurityLayer.h"
+#include "qpid/sys/SecuritySettings.h"
#include "qpid/log/Statement.h"
#include "boost/tokenizer.hpp"
@@ -33,6 +34,7 @@ namespace qpid {
namespace client {
using qpid::sys::SecurityLayer;
+using qpid::sys::SecuritySettings;
using qpid::framing::InternalErrorException;
class WindowsSasl : public Sasl
@@ -40,7 +42,7 @@ class WindowsSasl : public Sasl
public:
WindowsSasl(const ConnectionSettings&);
~WindowsSasl();
- std::string start(const std::string& mechanisms, unsigned int ssf);
+ std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings);
std::string step(const std::string& challenge);
std::string getMechanism();
std::string getUserId();
@@ -91,7 +93,7 @@ WindowsSasl::~WindowsSasl()
}
std::string WindowsSasl::start(const std::string& mechanisms,
- unsigned int /*ssf*/)
+ const SecuritySettings* /*externalSettings*/)
{
QPID_LOG(debug, "WindowsSasl::start(" << mechanisms << ")");
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Mar 23 18:00:49 2010
@@ -16,7 +16,8 @@
*
*/
-/** CLUSTER IMPLEMENTATION OVERVIEW
+/**
+ * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1>
*
* The cluster works on the principle that if all members of the
* cluster receive identical input, they will all produce identical
@@ -41,12 +42,15 @@
*
* The following are the current areas where broker uses timers or timestamps:
*
- * - Producer flow control: broker::SemanticState uses connection::getClusterOrderOutput.
- * a FrameHandler that sends frames to the client via the cluster. Used by broker::SessionState
+ * - Producer flow control: broker::SemanticState uses
+ * connection::getClusterOrderOutput. a FrameHandler that sends
+ * frames to the client via the cluster. Used by broker::SessionState
*
- * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is implemented by cluster::ExpiryPolicy.
+ * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
+ * implemented by cluster::ExpiryPolicy.
*
- * - Connection heartbeat: sends connection controls, not part of session command counting so OK to ignore.
+ * - Connection heartbeat: sends connection controls, not part of
+ * session command counting so OK to ignore.
*
* - LinkRegistry: only cluster elder is ever active for links.
*
@@ -57,7 +61,10 @@
*
* cluster::ExpiryPolicy implements the strategy for message expiry.
*
- * CLUSTER PROTOCOL OVERVIEW
+ * ClusterTimer implements periodic timed events in the cluster context.
+ * Used for periodic management events.
+ *
+ * <h1>CLUSTER PROTOCOL OVERVIEW</h1>
*
* Messages sent to/from CPG are called Events.
*
@@ -84,12 +91,16 @@
* - Connection control events carrying non-cluster frames: frames sent to the client.
* e.g. flow-control frames generated on a timer.
*
- * CLUSTER INITIALIZATION OVERVIEW
+ * <h1>CLUSTER INITIALIZATION OVERVIEW</h1>
+ *
+ * @see InitialStatusMap
*
* When a new member joins the CPG group, all members (including the
* new one) multicast their "initial status." The new member is in
- * INIT mode until it gets a complete set of initial status messages
- * from all cluster members.
+ * PRE_INIT mode until it gets a complete set of initial status
+ * messages from all cluster members. In a newly-forming cluster is
+ * then in INIT mode until the configured cluster-size members have
+ * joined.
*
* The newcomer uses initial status to determine
* - The cluster UUID
@@ -97,11 +108,16 @@
* - Do I need to get an update from an existing active member?
* - Can I recover from my own store?
*
- * Initialization happens in the Cluster constructor (plugin
- * early-init phase) because it needs to be done before the store
- * initializes. In INIT mode sending & receiving from the cluster are
- * done single-threaded, bypassing the normal PollableQueues because
- * the Poller is not active at this point to service them.
+ * Pre-initialization happens in the Cluster constructor (plugin
+ * early-init phase) because it needs to set the recovery flag before
+ * the store initializes. This phase lasts until inital-status is
+ * received for all active members. The PollableQueues and Multicaster
+ * are in "bypass" mode during this phase since the poller has not
+ * started so there are no threads to serve pollable queues.
+ *
+ * The remaining initialization happens in Cluster::initialize() or,
+ * if cluster-size=N is specified, in the deliver thread when an
+ * initial-status control is delivered that brings the total to N.
*/
#include "qpid/Exception.h"
#include "qpid/cluster/Cluster.h"
@@ -191,16 +207,19 @@ struct ClusterDispatcher : public framin
void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
uint8_t storeState, const Uuid& shutdownId,
- const framing::SequenceNumber& configSeq,
const std::string& firstConfig)
{
cluster.initialStatus(
member, version, active, clusterId,
- framing::cluster::StoreState(storeState), shutdownId, configSeq,
+ framing::cluster::StoreState(storeState), shutdownId,
firstConfig, l);
}
- void ready(const std::string& url) { cluster.ready(member, url, l); }
- void configChange(const std::string& current) { cluster.configChange(member, current, l); }
+ void ready(const std::string& url) {
+ cluster.ready(member, url, l);
+ }
+ void configChange(const std::string& current) {
+ cluster.configChange(member, current, l);
+ }
void updateOffer(uint64_t updatee) {
cluster.updateOffer(member, updatee, l);
}
@@ -241,7 +260,7 @@ Cluster::Cluster(const ClusterSettings&
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
- state(INIT),
+ state(PRE_INIT),
initMap(self, settings.size),
store(broker.getDataDir().getPath()),
elder(false),
@@ -271,17 +290,18 @@ Cluster::Cluster(const ClusterSettings&
// without modifying delivery-properties.exchange.
broker.getExchanges().registerExchange(
boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
- if (store.getState() == STORE_STATE_DIRTY_STORE)
- broker.setRecovery(false); // Ditch my current store.
if (store.getClusterId())
clusterId = store.getClusterId(); // Use stored ID if there is one.
QPID_LOG(notice, "Cluster store state: " << store)
}
-
cpg.join(name);
+ // pump the CPG dispatch manually till we get past PRE_INIT.
+ while (state == PRE_INIT)
+ cpg.dispatchOne();
}
Cluster::~Cluster() {
@@ -298,9 +318,14 @@ void Cluster::initialize() {
dispatcher.start();
deliverEventQueue.start();
deliverFrameQueue.start();
+ mcast.start();
+
+ // Run initMapCompleted immediately to process the initial configuration.
+ assert(state == INIT);
+ initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context.
// Add finalizer last for exception safety.
- broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
+ broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
}
// Called in connection thread to insert a client connection.
@@ -510,8 +535,13 @@ ConnectionPtr Cluster::getConnection(con
assert(cp);
}
else { // New remote connection, create a shadow.
- unsigned int ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0;
- cp = new Connection(*this, shadowOut, announce->getManagementId(), id, ssf);
+ qpid::sys::SecuritySettings secSettings;
+ if (announce) {
+ secSettings.ssf = announce->getSsf();
+ secSettings.authid = announce->getAuthid();
+ secSettings.nodict = announce->getNodict();
+ }
+ cp = new Connection(*this, shadowOut, announce->getManagementId(), id, secSettings);
}
connections.insert(ConnectionMap::value_type(id, cp));
}
@@ -571,9 +601,27 @@ void Cluster::setReady(Lock&) {
void Cluster::initMapCompleted(Lock& l) {
// Called on completion of the initial status map.
QPID_LOG(debug, *this << " initial status map complete. ");
- if (state == INIT) {
- // We have status for all members so we can make join descisions.
+ if (state == PRE_INIT) {
+ // PRE_INIT means we're still in the earlyInitialize phase, in the constructor.
+ // We decide here whether we want to recover from our store.
+ // We won't recover if we are joining an active cluster or our store is dirty.
+ if (store.hasStore() &&
+ (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE))
+ broker.setRecovery(false); // Ditch my current store.
+ state = INIT;
+ }
+ else if (state == INIT) {
+ // INIT means we are past Cluster::initialize().
+
+ // If we're forming an initial cluster (no active members)
+ // then we wait to reach the configured cluster-size
+ if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) {
+ QPID_LOG(info, *this << initMap.getActualSize()
+ << " members, waiting for at least " << initMap.getRequiredSize());
+ return;
+ }
initMap.checkConsistent();
+
elders = initMap.getElders();
QPID_LOG(debug, *this << " elders: " << elders);
if (elders.empty())
@@ -626,7 +674,7 @@ void Cluster::configChange(const MemberI
mcast.mcastControl(
ClusterInitialStatusBody(
ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
- store.getState(), store.getShutdownId(), store.getConfigSeq(),
+ store.getState(), store.getShutdownId(),
initMap.getFirstConfigStr()
),
self);
@@ -668,15 +716,13 @@ struct AppendQueue {
// Log a snapshot of broker state, used for debugging inconsistency problems.
// May only be called in deliver thread.
-void Cluster::debugSnapshot(const char* prefix, Connection* connection) {
+std::string Cluster::debugSnapshot() {
assertClusterSafe();
std::ostringstream msg;
- msg << prefix;
- if (connection) msg << " " << connection->getId();
- msg << " snapshot " << map.getFrameSeq() << ":";
+ msg << "queue snapshot at " << map.getFrameSeq() << ":";
AppendQueue append(msg);
broker.getQueues().eachQueue(append);
- QPID_LOG(trace, msg.str());
+ return msg.str();
}
// Called from Broker::~Broker when broker is shut down. At this
@@ -702,7 +748,6 @@ void Cluster::initialStatus(const Member
const framing::Uuid& id,
framing::cluster::StoreState store,
const framing::Uuid& shutdownId,
- const framing::SequenceNumber& configSeq,
const std::string& firstConfig,
Lock& l)
{
@@ -715,7 +760,7 @@ void Cluster::initialStatus(const Member
initMap.received(
member,
ClusterInitialStatusBody(ProtocolVersion(), version, active, id,
- store, shutdownId, configSeq, firstConfig)
+ store, shutdownId, firstConfig)
);
if (initMap.transitionToComplete()) initMapCompleted(l);
}
@@ -762,8 +807,9 @@ void Cluster::updateOffer(const MemberId
deliverEventQueue.start(); // Not involved in update.
}
if (updatee != self && url) {
- debugSnapshot("join");
+ QPID_LOG(debug, debugSnapshot());
if (mAgent) mAgent->clusterUpdate();
+ // Updatee will call clusterUpdate when update completes
}
}
@@ -836,7 +882,7 @@ void Cluster::checkUpdateIn(Lock& l) {
if (mAgent) mAgent->suppress(false); // Enable management output.
discarding = false; // ok to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
- debugSnapshot("initial");
+ QPID_LOG(debug, debugSnapshot());
if (mAgent) mAgent->clusterUpdate();
deliverEventQueue.start();
}
@@ -963,7 +1009,8 @@ void Cluster::memberUpdate(Lock& l) {
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
static const char* STATE[] = {
- "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
+ "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP",
+ "READY", "OFFER", "UPDATER", "LEFT"
};
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
o << "cluster(" << cluster.self << " " << STATE[cluster.state];
@@ -1003,12 +1050,14 @@ void Cluster::errorCheck(const MemberId&
}
void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
- timer->deliverWakeup(name);
+ if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+ timer->deliverWakeup(name);
}
void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
- timer->deliverDrop(name);
+ if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+ timer->deliverDrop(name);
}
bool Cluster::isElder() const {
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org