You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [11/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.h Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,58 +34,57 @@
#include <boost/intrusive_ptr.hpp>
namespace qpid {
-namespace broker {
-/**
- * Defines the behaviour for publish operations on a
- * transactional channel. Messages are routed through
- * exchanges when received but are not at that stage delivered
- * to the matching queues, rather the queues are held in an
- * instance of this class. On prepare() the message is marked
- * enqueued to the relevant queues in the MessagesStore. On
- * commit() the messages will be passed to the queue for
- * dispatch or to be added to the in-memory queue.
- */
-class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{
-
- class Commit{
- boost::intrusive_ptr<Message>& msg;
- public:
- Commit(boost::intrusive_ptr<Message>& msg);
- void operator()(const boost::shared_ptr<Queue>& queue);
- };
- class Rollback{
- boost::intrusive_ptr<Message>& msg;
- public:
- Rollback(boost::intrusive_ptr<Message>& msg);
- void operator()(const boost::shared_ptr<Queue>& queue);
- };
-
- boost::intrusive_ptr<Message> msg;
- std::list<boost::shared_ptr<Queue> > queues;
- std::list<boost::shared_ptr<Queue> > prepared;
-
- void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
-
- public:
- QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
- QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw();
- QPID_BROKER_EXTERN virtual void commit() throw();
- QPID_BROKER_EXTERN virtual void rollback() throw();
-
- virtual Message& getMessage() { return *msg; };
-
- QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
-
- virtual ~TxPublish(){}
- virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
-
- QPID_BROKER_EXTERN uint64_t contentSize();
-
- boost::intrusive_ptr<Message> getMessage() const { return msg; }
- const std::list<boost::shared_ptr<Queue> >& getQueues() const { return queues; }
- const std::list<boost::shared_ptr<Queue> >& getPrepared() const { return prepared; }
-};
-}
+ namespace broker {
+ /**
+ * Defines the behaviour for publish operations on a
+ * transactional channel. Messages are routed through
+ * exchanges when received but are not at that stage delivered
+ * to the matching queues, rather the queues are held in an
+ * instance of this class. On prepare() the message is marked
+ * enqueued to the relevant queues in the MessagesStore. On
+ * commit() the messages will be passed to the queue for
+ * dispatch or to be added to the in-memory queue.
+ */
+ class TxPublish : public TxOp, public Deliverable{
+
+ class Commit{
+ boost::intrusive_ptr<Message>& msg;
+ public:
+ Commit(boost::intrusive_ptr<Message>& msg);
+ void operator()(const boost::shared_ptr<Queue>& queue);
+ };
+ class Rollback{
+ boost::intrusive_ptr<Message>& msg;
+ public:
+ Rollback(boost::intrusive_ptr<Message>& msg);
+ void operator()(const boost::shared_ptr<Queue>& queue);
+ };
+
+ boost::intrusive_ptr<Message> msg;
+ std::list<boost::shared_ptr<Queue> > queues;
+ std::list<boost::shared_ptr<Queue> > prepared;
+
+ void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
+
+ public:
+ QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw();
+ QPID_BROKER_EXTERN virtual void commit() throw();
+ QPID_BROKER_EXTERN virtual void rollback() throw();
+
+ virtual Message& getMessage() { return *msg; };
+
+ QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
+
+ virtual ~TxPublish(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+
+ QPID_BROKER_EXTERN uint64_t contentSize();
+
+ boost::intrusive_ptr<Message> getMessage() const { return msg; }
+ const std::list<boost::shared_ptr<Queue> > getQueues() const { return queues; }
+ };
+ }
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/BrokerDefaults.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/BrokerDefaults.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/BrokerDefaults.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/BrokerDefaults.cpp Fri Oct 21 01:19:00 2011
@@ -31,16 +31,10 @@ const std::string Broker::Options::DEFAU
std::string
Broker::Options::getHome() {
std::string home;
-#ifdef _MSC_VER
char home_c[MAX_PATH+1];
size_t unused;
if (0 == getenv_s (&unused, home_c, sizeof(home_c), "HOME"))
home += home_c;
-#else
- char *home_c = getenv("HOME");
- if (home_c)
- home += home_c;
-#endif
return home;
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp Fri Oct 21 01:19:00 2011
@@ -42,7 +42,7 @@ public:
NullAuthenticator(Connection& connection);
~NullAuthenticator();
void getMechanisms(framing::Array& mechanisms);
- void start(const std::string& mechanism, const std::string* response);
+ void start(const std::string& mechanism, const std::string& response);
void step(const std::string&) {}
std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
};
@@ -57,7 +57,7 @@ public:
SspiAuthenticator(Connection& connection);
~SspiAuthenticator();
void getMechanisms(framing::Array& mechanisms);
- void start(const std::string& mechanism, const std::string* response);
+ void start(const std::string& mechanism, const std::string& response);
void step(const std::string& response);
std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
};
@@ -93,15 +93,14 @@ NullAuthenticator::~NullAuthenticator()
void NullAuthenticator::getMechanisms(Array& mechanisms)
{
mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("ANONYMOUS")));
- mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("PLAIN")));
}
-void NullAuthenticator::start(const string& mechanism, const string* response)
+void NullAuthenticator::start(const string& mechanism, const string& response)
{
QPID_LOG(warning, "SASL: No Authentication Performed");
if (mechanism == "PLAIN") { // Old behavior
- if (response && response->size() > 0 && (*response).c_str()[0] == (char) 0) {
- string temp = response->substr(1);
+ if (response.size() > 0 && response[0] == (char) 0) {
+ string temp = response.substr(1);
string::size_type i = temp.find((char)0);
string uid = temp.substr(0, i);
string pwd = temp.substr(i + 1);
@@ -139,7 +138,7 @@ void SspiAuthenticator::getMechanisms(Ar
QPID_LOG(info, "SASL: Mechanism list: ANONYMOUS PLAIN");
}
-void SspiAuthenticator::start(const string& mechanism, const string* response)
+void SspiAuthenticator::start(const string& mechanism, const string& response)
{
QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism);
if (mechanism == "ANONYMOUS") {
@@ -152,19 +151,16 @@ void SspiAuthenticator::start(const stri
// PLAIN's response is composed of 3 strings separated by 0 bytes:
// authorization id, authentication id (user), clear-text password.
- if (!response || response->size() == 0)
+ if (response.size() == 0)
throw ConnectionForcedException("Authentication failed");
- string::size_type i = response->find((char)0);
- string auth = response->substr(0, i);
- string::size_type j = response->find((char)0, i+1);
- string uid = response->substr(i+1, j-1);
- string pwd = response->substr(j+1);
- string dot(".");
+ string::size_type i = response.find((char)0);
+ string auth = response.substr(0, i);
+ string::size_type j = response.find((char)0, i+1);
+ string uid = response.substr(i+1, j-1);
+ string pwd = response.substr(j+1);
int error = 0;
- if (!LogonUser(const_cast<char*>(uid.c_str()),
- const_cast<char*>(dot.c_str()),
- const_cast<char*>(pwd.c_str()),
+ if (!LogonUser(uid.c_str(), ".", pwd.c_str(),
LOGON32_LOGON_NETWORK,
LOGON32_PROVIDER_DEFAULT,
&userToken))
@@ -180,7 +176,7 @@ void SspiAuthenticator::start(const stri
client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, 0);
}
-void SspiAuthenticator::step(const string& /*response*/)
+void SspiAuthenticator::step(const string& response)
{
QPID_LOG(info, "SASL: Need another step!!!");
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Fri Oct 21 01:19:00 2011
@@ -27,14 +27,10 @@
#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/ConnectionCodec.h"
#include "qpid/sys/Socket.h"
-#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/SystemInfo.h"
#include "qpid/sys/windows/SslAsynchIO.h"
-
#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
#include <memory>
-
// security.h needs to see this to distinguish from kernel use.
#define SECURITY_WIN32
#include <security.h>
@@ -72,10 +68,9 @@ struct SslServerOptions : qpid::Options
};
class SslProtocolFactory : public qpid::sys::ProtocolFactory {
+ qpid::sys::Socket listener;
const bool tcpNoDelay;
- boost::ptr_vector<Socket> listeners;
- boost::ptr_vector<AsynchAcceptor> acceptors;
- uint16_t listeningPort;
+ const uint16_t listeningPort;
std::string brokerHost;
const bool clientAuthSelected;
std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor;
@@ -83,14 +78,15 @@ class SslProtocolFactory : public qpid::
CredHandle credHandle;
public:
- SslProtocolFactory(const SslServerOptions&, const std::string& host, const std::string& port, int backlog, bool nodelay);
+ SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
~SslProtocolFactory();
void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*);
- void connect(sys::Poller::shared_ptr, const std::string& host, const std::string& port,
+ void connect(sys::Poller::shared_ptr, const std::string& host, int16_t port,
sys::ConnectionCodec::Factory*,
ConnectFailedCallback failed);
uint16_t getPort() const;
+ std::string getHost() const;
bool supports(const std::string& capability);
private:
@@ -119,7 +115,6 @@ static struct SslPlugin : public Plugin
try {
const broker::Broker::Options& opts = broker->getOptions();
ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options,
- "", boost::lexical_cast<std::string>(options.port),
opts.connectionBacklog,
opts.tcpNoDelay));
QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
@@ -132,13 +127,12 @@ static struct SslPlugin : public Plugin
} sslPlugin;
SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options,
- const std::string& host, const std::string& port, int backlog,
+ int backlog,
bool nodelay)
: tcpNoDelay(nodelay),
+ listeningPort(listener.listen(options.port, backlog)),
clientAuthSelected(options.clientAuth) {
- // Make sure that certificate store is good before listening to sockets
- // to avoid having open and listening sockets when there is no cert store
SecInvalidateHandle(&credHandle);
// Get the certificate for this server.
@@ -183,23 +177,6 @@ SslProtocolFactory::SslProtocolFactory(c
throw QPID_WINDOWS_ERROR(status);
::CertFreeCertificateContext(certContext);
::CertCloseStore(certStoreHandle, 0);
-
- // Listen to socket(s)
- SocketAddress sa(host, port);
-
- // We must have at least one resolved address
- QPID_LOG(info, "SSL Listening to: " << sa.asString())
- Socket* s = new Socket;
- listeningPort = s->listen(sa, backlog);
- listeners.push_back(s);
-
- // Try any other resolved addresses
- while (sa.nextAddress()) {
- QPID_LOG(info, "SSL Listening to: " << sa.asString())
- Socket* s = new Socket;
- s->listen(sa, backlog);
- listeners.push_back(s);
- }
}
SslProtocolFactory::~SslProtocolFactory() {
@@ -260,19 +237,21 @@ uint16_t SslProtocolFactory::getPort() c
return listeningPort; // Immutable no need for lock.
}
+std::string SslProtocolFactory::getHost() const {
+ return listener.getSockname();
+}
+
void SslProtocolFactory::accept(sys::Poller::shared_ptr poller,
sys::ConnectionCodec::Factory* fact) {
- for (unsigned i = 0; i<listeners.size(); ++i) {
- acceptors.push_back(
- AsynchAcceptor::create(listeners[i],
- boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
- acceptors[i].start(poller);
- }
+ acceptor.reset(
+ AsynchAcceptor::create(listener,
+ boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
+ acceptor->start(poller);
}
void SslProtocolFactory::connect(sys::Poller::shared_ptr poller,
const std::string& host,
- const std::string& port,
+ int16_t port,
sys::ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionHandler.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionHandler.cpp Fri Oct 21 01:19:00 2011
@@ -22,7 +22,6 @@
#include "qpid/client/ConnectionHandler.h"
#include "qpid/SaslFactory.h"
-#include "qpid/StringUtils.h"
#include "qpid/client/Bounds.h"
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/all_method_bodies.h"
@@ -143,9 +142,7 @@ void ConnectionHandler::outgoing(AMQFram
void ConnectionHandler::waitForOpen()
{
waitFor(ESTABLISHED);
- if (getState() == FAILED) {
- throw TransportFailure(errorText);
- } else if (getState() == CLOSED) {
+ if (getState() == FAILED || getState() == CLOSED) {
throw ConnectionException(errorCode, errorText);
}
}
@@ -205,24 +202,6 @@ void ConnectionHandler::fail(const std::
namespace {
std::string SPACE(" ");
-
-std::string join(const std::vector<std::string>& in)
-{
- std::string result;
- for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) {
- if (result.size()) result += SPACE;
- result += *i;
- }
- return result;
-}
-
-void intersection(const std::vector<std::string>& a, const std::vector<std::string>& b, std::vector<std::string>& results)
-{
- for (std::vector<std::string>::const_iterator i = a.begin(); i != a.end(); ++i) {
- if (std::find(b.begin(), b.end(), *i) != b.end()) results.push_back(*i);
- }
-}
-
}
void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& mechanisms, const Array& /*locales*/)
@@ -237,35 +216,26 @@ void ConnectionHandler::start(const Fiel
maxSsf
);
- std::vector<std::string> mechlist;
- if (mechanism.empty()) {
- //mechlist is simply what the server offers
- mechanisms.collect(mechlist);
- } else {
- //mechlist is the intersection of those indicated by user and
- //those supported by server, in the order listed by user
- std::vector<std::string> allowed = split(mechanism, " ");
- std::vector<std::string> supported;
- mechanisms.collect(supported);
- intersection(allowed, supported, mechlist);
- if (mechlist.empty()) {
- throw Exception(QPID_MSG("Desired mechanism(s) not valid: " << mechanism << " (supported: " << join(supported) << ")"));
+ std::string mechlist;
+ bool chosenMechanismSupported = mechanism.empty();
+ for (Array::const_iterator i = mechanisms.begin(); i != mechanisms.end(); ++i) {
+ if (!mechanism.empty() && mechanism == (*i)->get<std::string>()) {
+ chosenMechanismSupported = true;
+ mechlist = (*i)->get<std::string>() + SPACE + mechlist;
+ } else {
+ if (i != mechanisms.begin()) mechlist += SPACE;
+ mechlist += (*i)->get<std::string>();
}
}
+ if (!chosenMechanismSupported) {
+ fail("Selected mechanism not supported: " + mechanism);
+ }
+
if (sasl.get()) {
- string response;
- if (sasl->start(join(mechlist), response, getSecuritySettings ? getSecuritySettings() : 0)) {
- proxy.startOk(properties, sasl->getMechanism(), response, locale);
- } else {
- //response was null
- ConnectionStartOkBody body;
- body.setClientProperties(properties);
- body.setMechanism(sasl->getMechanism());
- //Don't set response, as none was given
- body.setLocale(locale);
- proxy.send(body);
- }
+ string response = sasl->start(mechanism.empty() ? mechlist : mechanism,
+ getSecuritySettings ? getSecuritySettings() : 0);
+ proxy.startOk(properties, sasl->getMechanism(), response, locale);
} else {
//TODO: verify that desired mechanism and locale are supported
string response = ((char)0) + username + ((char)0) + password;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionImpl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionImpl.cpp Fri Oct 21 01:19:00 2011
@@ -36,7 +36,6 @@
#include <boost/bind.hpp>
#include <boost/format.hpp>
-#include <boost/lexical_cast.hpp>
#include <boost/shared_ptr.hpp>
#include <limits>
@@ -259,16 +258,16 @@ void ConnectionImpl::open()
connector->setInputHandler(&handler);
connector->setShutdownHandler(this);
try {
- std::string p = boost::lexical_cast<std::string>(port);
- connector->connect(host, p);
-
+ connector->connect(host, port);
+
} catch (const std::exception& e) {
QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what());
connector.reset();
- throw TransportFailure(e.what());
+ throw;
}
connector->init();
-
+ QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
+
// Enable heartbeat if requested
uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat;
if (heartbeat) {
@@ -282,7 +281,6 @@ void ConnectionImpl::open()
// - in that case in connector.reset() above;
// - or when we are deleted
handler.waitForOpen();
- QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
// If the SASL layer has provided an "operational" userId for the connection,
// put it in the negotiated settings.
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/Connector.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/Connector.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/Connector.h Fri Oct 21 01:19:00 2011
@@ -61,7 +61,7 @@ class Connector : public framing::Output
static void registerFactory(const std::string& proto, Factory* connectorFactory);
virtual ~Connector() {};
- virtual void connect(const std::string& host, const std::string& port) = 0;
+ virtual void connect(const std::string& host, int port) = 0;
virtual void init() {};
virtual void close() = 0;
virtual void send(framing::AMQFrame& frame) = 0;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/RdmaConnector.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/RdmaConnector.cpp Fri Oct 21 01:19:00 2011
@@ -95,7 +95,7 @@ class RdmaConnector : public Connector,
std::string identifier;
- void connect(const std::string& host, const std::string& port);
+ void connect(const std::string& host, int port);
void close();
void send(framing::AMQFrame& frame);
void abort() {} // TODO: need to fix this for heartbeat timeouts to work
@@ -173,7 +173,7 @@ RdmaConnector::~RdmaConnector() {
}
}
-void RdmaConnector::connect(const std::string& host, const std::string& port){
+void RdmaConnector::connect(const std::string& host, int port){
Mutex::ScopedLock l(dataConnectedLock);
assert(!dataConnected);
@@ -184,7 +184,7 @@ void RdmaConnector::connect(const std::s
boost::bind(&RdmaConnector::disconnected, this),
boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
- SocketAddress sa(host, port);
+ SocketAddress sa(host, boost::lexical_cast<std::string>(port));
acon->start(poller, sa);
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/SessionImpl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/SessionImpl.cpp Fri Oct 21 01:19:00 2011
@@ -170,7 +170,6 @@ Demux& SessionImpl::getDemux()
void SessionImpl::waitForCompletion(const SequenceNumber& id)
{
Lock l(state);
- sys::Waitable::ScopedWait w(state);
waitForCompletionImpl(id);
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/SslConnector.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/SslConnector.cpp Fri Oct 21 01:19:00 2011
@@ -114,7 +114,7 @@ class SslConnector : public Connector
std::string identifier;
- void connect(const std::string& host, const std::string& port);
+ void connect(const std::string& host, int port);
void init();
void close();
void send(framing::AMQFrame& frame);
@@ -190,14 +190,14 @@ SslConnector::~SslConnector() {
close();
}
-void SslConnector::connect(const std::string& host, const std::string& port){
+void SslConnector::connect(const std::string& host, int port){
Mutex::ScopedLock l(closedLock);
assert(closed);
try {
socket.connect(host, port);
} catch (const std::exception& e) {
socket.close();
- throw TransportFailure(e.what());
+ throw ConnectionException(framing::connection::CLOSE_CODE_FRAMING_ERROR, e.what());
}
identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.cpp Fri Oct 21 01:19:00 2011
@@ -88,7 +88,7 @@ TCPConnector::~TCPConnector() {
close();
}
-void TCPConnector::connect(const std::string& host, const std::string& port) {
+void TCPConnector::connect(const std::string& host, int port) {
Mutex::ScopedLock l(lock);
assert(closed);
connector = AsynchConnector::create(
@@ -117,11 +117,11 @@ void TCPConnector::connected(const Socke
void TCPConnector::start(sys::AsynchIO* aio_) {
aio = aio_;
- for (int i = 0; i < 4; i++) {
+ for (int i = 0; i < 32; i++) {
aio->queueReadBuffer(new Buff(maxFrameSize));
}
- identifier = str(format("[%1%]") % socket.getFullAddress());
+ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
}
void TCPConnector::initAmqp() {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.h Fri Oct 21 01:19:00 2011
@@ -98,7 +98,7 @@ class TCPConnector : public Connector, p
protected:
virtual ~TCPConnector();
- void connect(const std::string& host, const std::string& port);
+ void connect(const std::string& host, int port);
void start(sys::AsynchIO* aio_);
void initAmqp();
virtual void connectFailed(const std::string& msg);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp Fri Oct 21 01:19:00 2011
@@ -30,23 +30,12 @@ void AcceptTracker::State::accept()
unaccepted.clear();
}
-SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative)
+void AcceptTracker::State::accept(qpid::framing::SequenceNumber id)
{
- SequenceSet accepting;
- if (cumulative) {
- for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) {
- accepting.add(*i);
- }
- unconfirmed.add(accepting);
- unaccepted.remove(accepting);
- } else {
- if (unaccepted.contains(id)) {
- unaccepted.remove(id);
- unconfirmed.add(id);
- accepting.add(id);
- }
+ if (unaccepted.contains(id)) {
+ unaccepted.remove(id);
+ unconfirmed.add(id);
}
- return accepting;
}
void AcceptTracker::State::release()
@@ -70,18 +59,6 @@ void AcceptTracker::delivered(const std:
destinationState[destination].unaccepted.add(id);
}
-namespace
-{
-const size_t FLUSH_FREQUENCY = 1024;
-}
-
-void AcceptTracker::addToPending(qpid::client::AsyncSession& session, const Record& record)
-{
- pending.push_back(record);
- if (pending.size() % FLUSH_FREQUENCY == 0) session.flush();
-}
-
-
void AcceptTracker::accept(qpid::client::AsyncSession& session)
{
for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
@@ -90,19 +67,20 @@ void AcceptTracker::accept(qpid::client:
Record record;
record.status = session.messageAccept(aggregateState.unaccepted);
record.accepted = aggregateState.unaccepted;
- addToPending(session, record);
+ pending.push_back(record);
aggregateState.accept();
}
-void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative)
+void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session)
{
for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
- i->second.accept(id, cumulative);
+ i->second.accept(id);
}
Record record;
- record.accepted = aggregateState.accept(id, cumulative);
+ record.accepted.add(id);
record.status = session.messageAccept(record.accepted);
- addToPending(session, record);
+ pending.push_back(record);
+ aggregateState.accept(id);
}
void AcceptTracker::release(qpid::client::AsyncSession& session)
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.h Fri Oct 21 01:19:00 2011
@@ -42,7 +42,7 @@ class AcceptTracker
public:
void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id);
void accept(qpid::client::AsyncSession&);
- void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative);
+ void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&);
void release(qpid::client::AsyncSession&);
uint32_t acceptsPending();
uint32_t acceptsPending(const std::string& destination);
@@ -62,7 +62,7 @@ class AcceptTracker
qpid::framing::SequenceSet unconfirmed;
void accept();
- qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative);
+ void accept(qpid::framing::SequenceNumber);
void release();
uint32_t acceptsPending();
void completed(qpid::framing::SequenceSet&);
@@ -79,7 +79,6 @@ class AcceptTracker
StateMap destinationState;
Records pending;
- void addToPending(qpid::client::AsyncSession&, const Record&);
void checkPending();
void completed(qpid::framing::SequenceSet&);
};
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Fri Oct 21 01:19:00 2011
@@ -129,10 +129,6 @@ const std::string HEADERS_EXCHANGE("head
const std::string XML_EXCHANGE("xml");
const std::string WILDCARD_ANY("#");
-//exchange prefixes:
-const std::string PREFIX_AMQ("amq.");
-const std::string PREFIX_QPID("qpid.");
-
const Verifier verifier;
}
@@ -203,7 +199,6 @@ class Exchange : protected Node
void checkCreate(qpid::client::AsyncSession&, CheckMode);
void checkAssert(qpid::client::AsyncSession&, CheckMode);
void checkDelete(qpid::client::AsyncSession&, CheckMode);
- bool isReservedName();
protected:
const std::string specifiedType;
@@ -238,8 +233,6 @@ class Subscription : public Exchange, pu
const bool reliable;
const bool durable;
const std::string actualType;
- const bool exclusiveQueue;
- const bool exclusiveSubscription;
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
@@ -314,7 +307,6 @@ struct Opt
Opt& operator/(const std::string& name);
operator bool() const;
std::string str() const;
- bool asBool(bool defaultValue) const;
const Variant::List& asList() const;
void collect(qpid::framing::FieldTable& args) const;
@@ -346,12 +338,6 @@ Opt::operator bool() const
return value && !value->isVoid() && value->asBool();
}
-bool Opt::asBool(bool defaultValue) const
-{
- if (value) return value->asBool();
- else return defaultValue;
-}
-
std::string Opt::str() const
{
if (value) return value->asString();
@@ -495,7 +481,7 @@ std::string Subscription::getSubscriptio
if (name.empty()) {
return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
} else {
- return name;
+ return (boost::format("%1%_%2%") % base % name).str();
}
}
@@ -504,9 +490,7 @@ Subscription::Subscription(const Address
queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
reliable(AddressResolution::is_reliable(address)),
durable(Opt(address)/LINK/DURABLE),
- actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
- exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
- exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue))
+ actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
{
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
@@ -566,7 +550,7 @@ void Subscription::subscribe(qpid::clien
checkAssert(session, FOR_RECEIVER);
//create subscription queue:
- session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
+ session.queueDeclare(arg::queue=queue, arg::exclusive=true,
arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
//'default' binding:
bindings.bind(session);
@@ -575,15 +559,15 @@ void Subscription::subscribe(qpid::clien
linkBindings.bind(session);
//subscribe to subscription queue:
AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
- session.messageSubscribe(arg::queue=queue, arg::destination=destination,
- arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+ session.messageSubscribe(arg::queue=queue, arg::destination=destination,
+ arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
}
void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
linkBindings.unbind(session);
session.messageCancel(destination);
- if (exclusiveQueue) session.queueDelete(arg::queue=queue, arg::ifUnused=true);
+ session.queueDelete(arg::queue=queue);
checkDelete(session, FOR_RECEIVER);
}
@@ -777,32 +761,18 @@ Exchange::Exchange(const Address& a) : N
linkBindings.setDefaultExchange(name);
}
-bool Exchange::isReservedName()
-{
- return name.find(PREFIX_AMQ) != std::string::npos || name.find(PREFIX_QPID) != std::string::npos;
-}
-
void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
{
if (enabled(createPolicy, mode)) {
try {
- if (isReservedName()) {
- try {
- sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
- } catch (const qpid::framing::NotFoundException& /*e*/) {
- throw ResolutionError((boost::format("Cannot create exchange %1%; names beginning with \"amq.\" or \"qpid.\" are reserved.") % name).str());
- }
-
- } else {
- std::string type = specifiedType;
- if (type.empty()) type = TOPIC_EXCHANGE;
- session.exchangeDeclare(arg::exchange=name,
- arg::type=type,
- arg::durable=durable,
- arg::autoDelete=autoDelete,
- arg::alternateExchange=alternateExchange,
- arg::arguments=arguments);
- }
+ std::string type = specifiedType;
+ if (type.empty()) type = TOPIC_EXCHANGE;
+ session.exchangeDeclare(arg::exchange=name,
+ arg::type=type,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
nodeBindings.bind(session);
session.sync();
} catch (const qpid::framing::NotAllowedException& e) {
@@ -852,7 +822,7 @@ void Exchange::checkAssert(qpid::client:
FieldTable::ValuePtr v = result.getArguments().get(i->first);
if (!v) {
throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
- } else if (*i->second != *v) {
+ } else if (i->second != v) {
throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
% i->first % name % *(i->second) % *v).str());
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Fri Oct 21 01:19:00 2011
@@ -20,6 +20,7 @@
*/
#include "ConnectionImpl.h"
#include "SessionImpl.h"
+#include "SimpleUrlParser.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/Session.h"
#include "qpid/messaging/PrivateImplRef.h"
@@ -38,18 +39,26 @@ using qpid::types::Variant;
using qpid::types::VAR_LIST;
using qpid::framing::Uuid;
-namespace {
-void merge(const std::string& value, std::vector<std::string>& list) {
- if (std::find(list.begin(), list.end(), value) == list.end())
- list.push_back(value);
+void convert(const Variant::List& from, std::vector<std::string>& to)
+{
+ for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
+ to.push_back(i->asString());
+ }
}
-void merge(const Variant::List& from, std::vector<std::string>& to)
+template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value)
{
- for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i)
- merge(i->asString(), to);
+ Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ value = (T) i->second;
+ QPID_LOG(debug, "option " << key << " specified as " << i->second);
+ return true;
+ } else {
+ return false;
+ }
}
+namespace {
std::string asString(const std::vector<std::string>& v) {
std::stringstream os;
os << "[";
@@ -62,8 +71,49 @@ std::string asString(const std::vector<s
}
}
+template <> bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
+ const std::string& key,
+ std::vector<std::string>& value)
+{
+ Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ value.clear();
+ if (i->second.getType() == VAR_LIST) {
+ convert(i->second.asList(), value);
+ } else {
+ value.push_back(i->second.asString());
+ }
+ QPID_LOG(debug, "option " << key << " specified as " << asString(value));
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void convert(const Variant::Map& from, ConnectionSettings& to)
+{
+ setIfFound(from, "username", to.username);
+ setIfFound(from, "password", to.password);
+ setIfFound(from, "sasl-mechanism", to.mechanism);
+ setIfFound(from, "sasl-service", to.service);
+ setIfFound(from, "sasl-min-ssf", to.minSsf);
+ setIfFound(from, "sasl-max-ssf", to.maxSsf);
+
+ setIfFound(from, "heartbeat", to.heartbeat);
+ setIfFound(from, "tcp-nodelay", to.tcpNoDelay);
+
+ setIfFound(from, "locale", to.locale);
+ setIfFound(from, "max-channels", to.maxChannels);
+ setIfFound(from, "max-frame-size", to.maxFrameSize);
+ setIfFound(from, "bounds", to.bounds);
+
+ setIfFound(from, "transport", to.protocol);
+
+ setIfFound(from, "ssl-cert-name", to.sslCertName);
+}
+
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
- replaceUrls(false), reconnect(false), timeout(-1), limit(-1),
+ reconnect(false), timeout(-1), limit(-1),
minReconnectInterval(3), maxReconnectInterval(60),
retries(0), reconnectOnLimitExceeded(true)
{
@@ -74,69 +124,27 @@ ConnectionImpl::ConnectionImpl(const std
void ConnectionImpl::setOptions(const Variant::Map& options)
{
- for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) {
- setOption(i->first, i->second);
+ sys::Mutex::ScopedLock l(lock);
+ convert(options, settings);
+ setIfFound(options, "reconnect", reconnect);
+ setIfFound(options, "reconnect-timeout", timeout);
+ setIfFound(options, "reconnect-limit", limit);
+ int64_t reconnectInterval;
+ if (setIfFound(options, "reconnect-interval", reconnectInterval)) {
+ minReconnectInterval = maxReconnectInterval = reconnectInterval;
+ } else {
+ setIfFound(options, "reconnect-interval-min", minReconnectInterval);
+ setIfFound(options, "reconnect-interval-max", maxReconnectInterval);
}
+ setIfFound(options, "reconnect-urls", urls);
+ setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded);
}
void ConnectionImpl::setOption(const std::string& name, const Variant& value)
{
- sys::Mutex::ScopedLock l(lock);
- if (name == "reconnect") {
- reconnect = value;
- } else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
- timeout = value;
- } else if (name == "reconnect-limit" || name == "reconnect_limit") {
- limit = value;
- } else if (name == "reconnect-interval" || name == "reconnect_interval") {
- maxReconnectInterval = minReconnectInterval = value;
- } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") {
- minReconnectInterval = value;
- } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") {
- maxReconnectInterval = value;
- } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") {
- replaceUrls = value.asBool();
- } else if (name == "reconnect-urls" || name == "reconnect_urls") {
- if (replaceUrls) urls.clear();
- if (value.getType() == VAR_LIST) {
- merge(value.asList(), urls);
- } else {
- merge(value.asString(), urls);
- }
- } else if (name == "username") {
- settings.username = value.asString();
- } else if (name == "password") {
- settings.password = value.asString();
- } else if (name == "sasl-mechanism" || name == "sasl_mechanism" ||
- name == "sasl-mechanisms" || name == "sasl_mechanisms") {
- settings.mechanism = value.asString();
- } else if (name == "sasl-service" || name == "sasl_service") {
- settings.service = value.asString();
- } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") {
- settings.minSsf = value;
- } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") {
- settings.maxSsf = value;
- } else if (name == "heartbeat") {
- settings.heartbeat = value;
- } else if (name == "tcp-nodelay" || name == "tcp_nodelay") {
- settings.tcpNoDelay = value;
- } else if (name == "locale") {
- settings.locale = value.asString();
- } else if (name == "max-channels" || name == "max_channels") {
- settings.maxChannels = value;
- } else if (name == "max-frame-size" || name == "max_frame_size") {
- settings.maxFrameSize = value;
- } else if (name == "bounds") {
- settings.bounds = value;
- } else if (name == "transport") {
- settings.protocol = value.asString();
- } else if (name == "ssl-cert-name" || name == "ssl_cert_name") {
- settings.sslCertName = value.asString();
- } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") {
- reconnectOnLimitExceeded = value;
- } else {
- throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
- }
+ Variant::Map options;
+ options[name] = value;
+ setOptions(options);
}
@@ -206,7 +214,7 @@ qpid::messaging::Session ConnectionImpl:
sessions[name] = impl;
break;
} catch (const qpid::TransportFailure&) {
- reopen();
+ open();
} catch (const qpid::SessionException& e) {
throw qpid::messaging::SessionError(e.what());
} catch (const std::exception& e) {
@@ -227,15 +235,6 @@ void ConnectionImpl::open()
catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); }
}
-void ConnectionImpl::reopen()
-{
- if (!reconnect) {
- throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
- }
- open();
-}
-
-
bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
{
if (timeout == 0) return true;
@@ -263,9 +262,14 @@ void ConnectionImpl::connect(const qpid:
}
void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) {
- for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i)
- merge(i->str(), urls);
- QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
+ if (more.size()) {
+ for (size_t i = 0; i < more.size(); ++i) {
+ if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) {
+ urls.push_back(more[i].str());
+ }
+ }
+ QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
+ }
}
bool ConnectionImpl::tryConnect()
@@ -274,14 +278,21 @@ bool ConnectionImpl::tryConnect()
for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
try {
QPID_LOG(info, "Trying to connect to " << *i << "...");
- Url url(*i);
- if (url.getUser().size()) settings.username = url.getUser();
- if (url.getPass().size()) settings.password = url.getPass();
- connection.open(url, settings);
+ //TODO: when url support is more complete can avoid this test here
+ if (i->find("amqp:") == 0) {
+ Url url(*i);
+ connection.open(url, settings);
+ } else {
+ SimpleUrlParser::parse(*i, settings);
+ connection.open(settings);
+ }
QPID_LOG(info, "Connected to " << *i);
mergeUrls(connection.getInitialBrokers(), l);
return resetSessions(l);
- } catch (const qpid::TransportFailure& e) {
+ } catch (const qpid::ConnectionException& e) {
+ //TODO: need to fix timeout on
+ //qpid::client::Connection::open() so that it throws
+ //TransportFailure rather than a ConnectionException
QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
}
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Fri Oct 21 01:19:00 2011
@@ -43,7 +43,6 @@ class ConnectionImpl : public qpid::mess
public:
ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options);
void open();
- void reopen();
bool isOpen() const;
void close();
qpid::messaging::Session newSession(bool transactional, const std::string& name);
@@ -60,7 +59,6 @@ class ConnectionImpl : public qpid::mess
qpid::sys::Semaphore semaphore;//used to coordinate reconnection
Sessions sessions;
qpid::client::Connection connection;
- bool replaceUrls; // Replace rather than merging with reconnect-urls
std::vector<std::string> urls;
qpid::client::ConnectionSettings settings;
bool reconnect;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Fri Oct 21 01:19:00 2011
@@ -144,10 +144,10 @@ void IncomingMessages::accept()
acceptTracker.accept(session);
}
-void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative)
+void IncomingMessages::accept(qpid::framing::SequenceNumber id)
{
sys::Mutex::ScopedLock l(lock);
- acceptTracker.accept(id, session, cumulative);
+ acceptTracker.accept(id, session);
}
@@ -301,7 +301,6 @@ const std::string SUBJECT("qpid.subject"
const std::string X_APP_ID("x-amqp-0-10.app-id");
const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
-const std::string X_TIMESTAMP("x-amqp-0-10.timestamp");
}
void populateHeaders(qpid::messaging::Message& message,
@@ -335,13 +334,10 @@ void populateHeaders(qpid::messaging::Me
if (messageProperties->hasContentEncoding()) {
message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding();
}
- // routing-key, timestamp, others?
+ // routing-key, others?
if (deliveryProperties && deliveryProperties->hasRoutingKey()) {
message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey();
}
- if (deliveryProperties && deliveryProperties->hasTimestamp()) {
- message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp();
- }
}
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Fri Oct 21 01:19:00 2011
@@ -72,7 +72,7 @@ class IncomingMessages
bool get(Handler& handler, qpid::sys::Duration timeout);
bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
void accept();
- void accept(qpid::framing::SequenceNumber id, bool cumulative);
+ void accept(qpid::framing::SequenceNumber id);
void releaseAll();
void releasePending(const std::string& destination);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Fri Oct 21 01:19:00 2011
@@ -59,9 +59,7 @@ void OutgoingMessage::convert(const qpid
message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
}
translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
- if (from.getTtl().getMilliseconds()) {
- message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
- }
+ message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
if (from.getDurable()) {
message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT);
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Fri Oct 21 01:19:00 2011
@@ -135,7 +135,6 @@ void SenderImpl::sendUnreliable(const qp
void SenderImpl::replay(const sys::Mutex::ScopedLock&)
{
for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
- i->message.setRedelivered(true);
sink->send(session, name, *i);
}
}
@@ -148,7 +147,7 @@ uint32_t SenderImpl::checkPendingSends(b
uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&)
{
if (flush) {
- session.flush();
+ session.flush();
flushed = true;
} else {
flushed = false;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Fri Oct 21 01:19:00 2011
@@ -60,14 +60,12 @@ SessionImpl::SessionImpl(ConnectionImpl&
void SessionImpl::checkError()
{
- ScopedLock l(lock);
qpid::client::SessionBase_0_10Access s(session);
s.get()->assertOpen();
}
bool SessionImpl::hasError()
{
- ScopedLock l(lock);
qpid::client::SessionBase_0_10Access s(session);
return s.get()->hasError();
}
@@ -114,14 +112,13 @@ void SessionImpl::release(qpid::messagin
execute1<Release>(m);
}
-void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative)
+void SessionImpl::acknowledge(qpid::messaging::Message& m)
{
//Should probably throw an exception on failure here, or indicate
//it through a return type at least. Failure means that the
//message may be redelivered; i.e. the application cannot delete
//any state necessary for preventing reprocessing of the message
- Acknowledge2 ack(*this, m, cumulative);
- execute(ack);
+ execute1<Acknowledge1>(m);
}
void SessionImpl::close()
@@ -131,29 +128,27 @@ void SessionImpl::close()
senders.clear();
receivers.clear();
} else {
- Senders sCopy;
- Receivers rCopy;
- {
- ScopedLock l(lock);
- senders.swap(sCopy);
- receivers.swap(rCopy);
- }
- for (Senders::iterator i = sCopy.begin(); i != sCopy.end(); ++i)
- {
- // outside the lock, will call senderCancelled
- i->second.close();
+ while (true) {
+ Sender s;
+ {
+ ScopedLock l(lock);
+ if (senders.empty()) break;
+ s = senders.begin()->second;
+ }
+ s.close(); // outside the lock, will call senderCancelled
}
- for (Receivers::iterator i = rCopy.begin(); i != rCopy.end(); ++i)
- {
- // outside the lock, will call receiverCancelled
- i->second.close();
+ while (true) {
+ Receiver r;
+ {
+ ScopedLock l(lock);
+ if (receivers.empty()) break;
+ r = receivers.begin()->second;
+ }
+ r.close(); // outside the lock, will call receiverCancelled
}
}
connection->closed(*this);
- if (!hasError()) {
- ScopedLock l(lock);
- session.close();
- }
+ if (!hasError()) session.close();
}
template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t)
@@ -436,11 +431,8 @@ uint32_t SessionImpl::getUnsettledAcksIm
void SessionImpl::syncImpl(bool block)
{
- {
- ScopedLock l(lock);
- if (block) session.sync();
- else session.flush();
- }
+ if (block) session.sync();
+ else session.flush();
//cleanup unconfirmed accept records:
incoming.pendingAccept();
}
@@ -475,10 +467,10 @@ void SessionImpl::acknowledgeImpl()
if (!transactional) incoming.accept();
}
-void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative)
+void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m)
{
ScopedLock l(lock);
- if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative);
+ if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId());
}
void SessionImpl::rejectImpl(qpid::messaging::Message& m)
@@ -517,7 +509,7 @@ void SessionImpl::senderCancelled(const
void SessionImpl::reconnect()
{
- connection->reopen();
+ connection->open();
}
bool SessionImpl::backoff()
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.h Fri Oct 21 01:19:00 2011
@@ -63,7 +63,7 @@ class SessionImpl : public qpid::messagi
void acknowledge(bool sync);
void reject(qpid::messaging::Message&);
void release(qpid::messaging::Message&);
- void acknowledge(qpid::messaging::Message& msg, bool cumulative);
+ void acknowledge(qpid::messaging::Message& msg);
void close();
void sync(bool block);
qpid::messaging::Sender createSender(const qpid::messaging::Address& address);
@@ -139,7 +139,7 @@ class SessionImpl : public qpid::messagi
void commitImpl();
void rollbackImpl();
void acknowledgeImpl();
- void acknowledgeImpl(qpid::messaging::Message&, bool cumulative);
+ void acknowledgeImpl(qpid::messaging::Message&);
void rejectImpl(qpid::messaging::Message&);
void releaseImpl(qpid::messaging::Message&);
void closeImpl();
@@ -204,13 +204,12 @@ class SessionImpl : public qpid::messagi
void operator()() { impl.releaseImpl(message); }
};
- struct Acknowledge2 : Command
+ struct Acknowledge1 : Command
{
qpid::messaging::Message& message;
- bool cumulative;
- Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {}
- void operator()() { impl.acknowledgeImpl(message, cumulative); }
+ Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
+ void operator()() { impl.acknowledgeImpl(message); }
};
struct CreateSender;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SaslFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SaslFactory.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SaslFactory.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SaslFactory.cpp Fri Oct 21 01:19:00 2011
@@ -71,7 +71,7 @@ class WindowsSasl : public Sasl
public:
WindowsSasl( const std::string &, const std::string &, const std::string &, const std::string &, int, int );
~WindowsSasl();
- bool start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings);
+ std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings);
std::string step(const std::string& challenge);
std::string getMechanism();
std::string getUserId();
@@ -121,8 +121,8 @@ WindowsSasl::~WindowsSasl()
{
}
-bool WindowsSasl::start(const std::string& mechanisms, std::string& response,
- const SecuritySettings* /*externalSettings*/)
+std::string WindowsSasl::start(const std::string& mechanisms,
+ const SecuritySettings* /*externalSettings*/)
{
QPID_LOG(debug, "WindowsSasl::start(" << mechanisms << ")");
@@ -142,18 +142,18 @@ bool WindowsSasl::start(const std::strin
if (!haveAnon && !havePlain)
throw InternalErrorException(QPID_MSG("Sasl error: no common mechanism"));
+ std::string resp = "";
if (havePlain) {
mechanism = PLAIN;
- response = ((char)0) + settings.username + ((char)0) + settings.password;
+ resp = ((char)0) + settings.username + ((char)0) + settings.password;
}
else {
mechanism = ANONYMOUS;
- response = "";
}
- return true;
+ return resp;
}
-std::string WindowsSasl::step(const std::string& /*challenge*/)
+std::string WindowsSasl::step(const std::string& challenge)
{
// Shouldn't get this for PLAIN...
throw InternalErrorException(QPID_MSG("Sasl step error"));
@@ -169,7 +169,7 @@ std::string WindowsSasl::getUserId()
return std::string(); // TODO - when GSSAPI is supported, return userId for connection.
}
-std::auto_ptr<SecurityLayer> WindowsSasl::getSecurityLayer(uint16_t /*maxFrameSize*/)
+std::auto_ptr<SecurityLayer> WindowsSasl::getSecurityLayer(uint16_t maxFrameSize)
{
return std::auto_ptr<SecurityLayer>(0);
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SslConnector.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SslConnector.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SslConnector.cpp Fri Oct 21 01:19:00 2011
@@ -77,7 +77,7 @@ public:
framing::ProtocolVersion pVersion,
const ConnectionSettings&,
ConnectionImpl*);
- virtual void connect(const std::string& host, const std::string& port);
+ virtual void connect(const std::string& host, int port);
virtual void connected(const Socket&);
unsigned int getSSF();
};
@@ -153,7 +153,7 @@ SslConnector::~SslConnector()
// Will this get reach via virtual method via boost::bind????
-void SslConnector::connect(const std::string& host, const std::string& port) {
+void SslConnector::connect(const std::string& host, int port) {
brokerHost = host;
TCPConnector::connect(host, port);
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.cpp Fri Oct 21 01:19:00 2011
@@ -36,45 +36,45 @@
*
* IMPORTANT NOTE: any time code is added to the broker that uses timers,
* the cluster may need to be updated to take account of this.
- *
+ *
*
* USE OF TIMESTAMPS IN THE BROKER
- *
+ *
* The following are the current areas where broker uses timers or timestamps:
- *
+ *
* - Producer flow control: broker::SemanticState uses
* connection::getClusterOrderOutput. a FrameHandler that sends
* frames to the client via the cluster. Used by broker::SessionState
- *
+ *
* - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
* implemented by cluster::ExpiryPolicy.
- *
+ *
* - Connection heartbeat: sends connection controls, not part of
* session command counting so OK to ignore.
- *
+ *
* - LinkRegistry: only cluster elder is ever active for links.
- *
+ *
* - management::ManagementBroker: uses MessageHandler supplied by cluster
* to send messages to the broker via the cluster.
+ *
+ * - Dtx: not yet supported with cluster.
*
- * cluster::ExpiryPolicy uses cluster time.
+ * cluster::ExpiryPolicy implements the strategy for message expiry.
*
* ClusterTimer implements periodic timed events in the cluster context.
- * Used for:
- * - periodic management events.
- * - DTX transaction timeouts.
+ * Used for periodic management events.
*
* <h1>CLUSTER PROTOCOL OVERVIEW</h1>
- *
+ *
* Messages sent to/from CPG are called Events.
*
* An Event carries a ConnectionId, which includes a MemberId and a
* connection number.
- *
+ *
* Events are either
* - Connection events: non-0 connection number and are associated with a connection.
* - Cluster Events: 0 connection number, are not associated with a connection.
- *
+ *
* Events are further categorized as:
* - Control: carries method frame(s) that affect cluster behavior.
* - Data: carries raw data received from a client connection.
@@ -146,7 +146,6 @@
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
-#include "qpid/framing/ClusterClockBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAbortBody.h"
#include "qpid/framing/ClusterRetractOfferBody.h"
@@ -199,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpi
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1159329;
+const uint32_t Cluster::CLUSTER_VERSION = 1058747;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -215,7 +214,7 @@ struct ClusterDispatcher : public framin
{
cluster.initialStatus(
member, version, active, clusterId,
- framing::cluster::StoreState(storeState), shutdownId,
+ framing::cluster::StoreState(storeState), shutdownId,
firstConfig, l);
}
void ready(const std::string& url) {
@@ -231,21 +230,21 @@ struct ClusterDispatcher : public framin
cluster.updateOffer(member, updatee, l);
}
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
+ void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
cluster.errorCheck(member, type, frameSeq, l);
}
void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
- void timerDrop(const std::string& name) { cluster.timerDrop(member, name, l); }
+ void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); }
void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
void deliverToQueue(const std::string& queue, const std::string& message) {
cluster.deliverToQueue(queue, message, l);
}
- void clock(uint64_t time) { cluster.clock(time, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
- settings(set),
+ settings(set),
broker(b),
mgmtObject(0),
poller(b.getPoller()),
@@ -254,7 +253,7 @@ Cluster::Cluster(const ClusterSettings&
self(cpg.self()),
clusterId(true),
mAgent(0),
- expiryPolicy(new ExpiryPolicy(*this)),
+ expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -278,11 +277,8 @@ Cluster::Cluster(const ClusterSettings&
lastBroker(false),
updateRetracted(false),
updateClosed(false),
- error(*this),
- acl(0)
+ error(*this)
{
- broker.setInCluster(true);
-
// We give ownership of the timer to the broker and keep a plain pointer.
// This is OK as it means the timer has the same lifetime as the broker.
timer = new ClusterTimer(*this);
@@ -303,7 +299,7 @@ Cluster::Cluster(const ClusterSettings&
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
- clusterId = store.getClusterId();
+ clusterId = store.getClusterId();
QPID_LOG(notice, "Cluster store state: " << store)
}
cpg.join(name);
@@ -364,15 +360,14 @@ void Cluster::addShadowConnection(const
// Safe to use connections here because we're pre-catchup, stalled
// and discarding, so deliveredFrame is not processing any
// connection events.
- assert(discarding);
+ assert(discarding);
pair<ConnectionMap::iterator, bool> ib
= connections.insert(ConnectionMap::value_type(c->getId(), c));
- // Like this to avoid tripping up unused variable warning when NDEBUG set
- if (!ib.second) assert(ib.second);
+ assert(ib.second);
}
void Cluster::erase(const ConnectionId& id) {
- Lock l(lock);
+ Lock l(lock);
erase(id,l);
}
@@ -398,9 +393,9 @@ std::vector<Url> Cluster::getUrls() cons
std::vector<Url> Cluster::getUrls(Lock&) const {
return map.memberUrls();
-}
+}
-void Cluster::leave() {
+void Cluster::leave() {
Lock l(lock);
leave(l);
}
@@ -410,7 +405,7 @@ void Cluster::leave() {
QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
} do {} while(0)
-void Cluster::leave(Lock&) {
+void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
@@ -429,7 +424,7 @@ void Cluster::deliver(
uint32_t nodeid,
uint32_t pid,
void* msg,
- int msg_len)
+ int msg_len)
{
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
@@ -460,7 +455,7 @@ void Cluster::deliveredEvent(const Event
EventFrame ef(e, e.getFrame());
// Stop the deliverEventQueue on update offers.
// This preserves the connection decoder fragments for an update.
- // Only do this for the two brokers that are directly involved in this
+ // Only do this for the two brokers that are directly involved in this
// offer: the one making the offer, or the one receiving it.
const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) {
@@ -470,7 +465,7 @@ void Cluster::deliveredEvent(const Event
}
deliverFrame(ef);
}
- else if(!discarding) {
+ else if(!discarding) {
if (e.isControl())
deliverFrame(EventFrame(e, e.getFrame()));
else {
@@ -512,7 +507,7 @@ void Cluster::deliveredFrame(const Event
// the event queue.
e.frame = AMQFrame(
ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee()));
- deliverEventQueue.start();
+ deliverEventQueue.start();
}
// Process each frame through the error checker.
if (error.isUnresolved()) {
@@ -520,14 +515,14 @@ void Cluster::deliveredFrame(const Event
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
}
- else
+ else
processFrame(e, l);
}
void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
- QPID_LOG_IF(trace, loggable(e.frame), *this << " DLVR: " << e);
+ QPID_LOG(trace, *this << " DLVR: " << e);
ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
@@ -536,15 +531,14 @@ void Cluster::processFrame(const EventFr
map.incrementFrameSeq();
ConnectionPtr connection = getConnection(e, l);
if (connection) {
- QPID_LOG_IF(trace, loggable(e.frame),
- *this << " DLVR " << map.getFrameSeq() << ": " << e);
+ QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
connection->deliveredFrame(e);
}
else
- throw Exception(QPID_MSG("Unknown connection: " << e));
+ QPID_LOG(trace, *this << " DROP (no connection): " << e);
}
else // Drop connection frames while state < CATCHUP
- QPID_LOG_IF(trace, loggable(e.frame), *this << " DROP (joining): " << e);
+ QPID_LOG(trace, *this << " DROP (joining): " << e);
}
// Called in deliverFrameQueue thread
@@ -583,7 +577,7 @@ Cluster::ConnectionVector Cluster::getCo
}
// CPG config-change callback.
-void Cluster::configChange (
+void Cluster::configChange (
cpg_handle_t /*handle*/,
const cpg_name */*group*/,
const cpg_address *members, int nMembers,
@@ -613,7 +607,7 @@ void Cluster::setReady(Lock&) {
}
// Set the management status from the Cluster::state.
-//
+//
// NOTE: Management updates are sent based on property changes. In
// order to keep consistency across the cluster, we touch the local
// management status property even if it is locally unchanged for any
@@ -624,7 +618,7 @@ void Cluster::setMgmtStatus(Lock&) {
}
void Cluster::initMapCompleted(Lock& l) {
- // Called on completion of the initial status map.
+ // Called on completion of the initial status map.
QPID_LOG(debug, *this << " initial status map complete. ");
setMgmtStatus(l);
if (state == PRE_INIT) {
@@ -671,8 +665,6 @@ void Cluster::initMapCompleted(Lock& l)
else { // I can go ready.
discarding = false;
setReady(l);
- // Must be called *before* memberUpdate so first update will be generated.
- failoverExchange->setReady();
memberUpdate(l);
updateMgmtMembership(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
@@ -709,8 +701,8 @@ void Cluster::configChange(const MemberI
if (initMap.isResendNeeded()) {
mcast.mcastControl(
ClusterInitialStatusBody(
- ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
- store.getState(), store.getShutdownId(),
+ ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
+ store.getState(), store.getShutdownId(),
initMap.getFirstConfigStr()
),
self);
@@ -725,20 +717,6 @@ void Cluster::configChange(const MemberI
updateMgmtMembership(l); // Update on every config change for consistency
}
-struct ClusterClockTask : public sys::TimerTask {
- Cluster& cluster;
- sys::Timer& timer;
-
- ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval)
- : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {}
-
- void fire() {
- cluster.sendClockUpdate();
- setupNextFire();
- timer.add(this);
- }
-};
-
void Cluster::becomeElder(Lock&) {
if (elder) return; // We were already the elder.
// We are the oldest, reactive links if necessary
@@ -746,8 +724,6 @@ void Cluster::becomeElder(Lock&) {
elder = true;
broker.getLinks().setPassive(false);
timer->becomeElder();
-
- clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval));
}
void Cluster::makeOffer(const MemberId& id, Lock& ) {
@@ -783,7 +759,7 @@ std::string Cluster::debugSnapshot() {
// point we know the poller has stopped so no poller callbacks will be
// invoked. We must ensure that CPG has also shut down so no CPG
// callbacks will be invoked.
-//
+//
void Cluster::brokerShutdown() {
sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
try { cpg.shutdown(); }
@@ -799,7 +775,7 @@ void Cluster::updateRequest(const Member
}
void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
- const framing::Uuid& id,
+ const framing::Uuid& id,
framing::cluster::StoreState store,
const framing::Uuid& shutdownId,
const std::string& firstConfig,
@@ -857,8 +833,6 @@ void Cluster::updateOffer(const MemberId
else if (updatee == self && url) {
assert(state == JOINER);
state = UPDATEE;
- acl = broker.getAcl();
- broker.setAcl(0); // Disable ACL during update
QPID_LOG(notice, *this << " receiving update from " << updater);
checkUpdateIn(l);
}
@@ -870,7 +844,7 @@ void Cluster::updateOffer(const MemberId
if (updatee != self && url) {
QPID_LOG(debug, debugSnapshot());
if (mAgent) mAgent->clusterUpdate();
- // Updatee will call clusterUpdate() via checkUpdateIn() when update completes
+ // Updatee will call clusterUpdate when update completes
}
}
@@ -951,15 +925,13 @@ void Cluster::checkUpdateIn(Lock& l) {
if (!updateClosed) return; // Wait till update connection closes.
if (updatedMap) { // We're up to date
map = *updatedMap;
+ failoverExchange->setUrls(getUrls(l));
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
memberUpdate(l);
- // Must be called *after* memberUpdate() to avoid sending an extra update.
- failoverExchange->setReady();
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
- broker.setAcl(acl); // Restore ACL
discarding = false; // OK to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
@@ -969,10 +941,6 @@ void Cluster::checkUpdateIn(Lock& l) {
mAgent->suppress(false); // Enable management output.
mAgent->clusterUpdate();
}
- // Restore alternate exchange settings on exchanges.
- broker.getExchanges().eachExchange(
- boost::bind(&broker::Exchange::recoveryComplete, _1,
- boost::ref(broker.getExchanges())));
enableClusterSafe(); // Enable cluster-safe assertions
deliverEventQueue.start();
}
@@ -1001,7 +969,7 @@ void Cluster::updateOutDone(Lock& l) {
void Cluster::updateOutError(const std::exception& e) {
Monitor::ScopedLock l(lock);
- QPID_LOG(error, *this << " error sending update: " << e.what());
+ QPID_LOG(error, *this << " error sending update: " << e.what());
updateOutDone(l);
}
@@ -1099,7 +1067,7 @@ void Cluster::memberUpdate(Lock& l) {
void Cluster::updateMgmtMembership(Lock& l) {
if (!mgmtObject) return;
std::vector<Url> urls = getUrls(l);
- mgmtObject->set_clusterSize(urls.size());
+ mgmtObject->set_clusterSize(urls.size());
string urlstr;
for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) {
if (i != urls.begin()) urlstr += ";";
@@ -1146,6 +1114,10 @@ void Cluster::setClusterId(const Uuid& u
QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
}
+void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
+ expiryPolicy->deliverExpire(id);
+}
+
void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
// If we see an errorCheck here (rather than in the ErrorCheck
// class) then we have processed succesfully past the point of the
@@ -1183,35 +1155,6 @@ void Cluster::deliverToQueue(const std::
q->deliver(msg);
}
-sys::AbsTime Cluster::getClusterTime() {
- Mutex::ScopedLock l(lock);
- return clusterTime;
-}
-
-// This method is called during update on the updatee to set the initial cluster time.
-void Cluster::clock(const uint64_t time) {
- Mutex::ScopedLock l(lock);
- clock(time, l);
-}
-
-// called when broadcast message received
-void Cluster::clock(const uint64_t time, Lock&) {
- clusterTime = AbsTime(EPOCH, time);
- AbsTime now = AbsTime::now();
-
- if (!elder) {
- clusterTimeOffset = Duration(now, clusterTime);
- }
-}
-
-// called by elder timer to send clock broadcast
-void Cluster::sendClockUpdate() {
- Mutex::ScopedLock l(lock);
- int64_t nanosecondsSinceEpoch = Duration(EPOCH, now());
- nanosecondsSinceEpoch += clusterTimeOffset;
- mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self);
-}
-
bool Cluster::deferDeliveryImpl(const std::string& queue,
const boost::intrusive_ptr<broker::Message>& msg)
{
@@ -1224,12 +1167,4 @@ bool Cluster::deferDeliveryImpl(const st
return true;
}
-bool Cluster::loggable(const AMQFrame& f) {
- const AMQMethodBody* method = (f.getMethod());
- if (!method) return true; // Not a method
- bool isClock = method->amqpClassId() == ClusterClockBody::CLASS_ID
- && method->amqpMethodId() == ClusterClockBody::METHOD_ID;
- return !isClock;
-}
-
}} // namespace qpid::cluster
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org