You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC
svn commit: r1451244 [13/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...
Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.h Thu Feb 28 16:14:30 2013
@@ -26,7 +26,6 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Timer.h"
#include "qpid/broker/ConnectionToken.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/management/ManagementEvent.h"
@@ -34,9 +33,11 @@
#include "qmf/org/apache/qpid/broker/Agent.h"
#include "qmf/org/apache/qpid/broker/Memory.h"
#include "qpid/sys/MemStat.h"
+#include "qpid/sys/PollableQueue.h"
#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/ResizableBuffer.h>
+#include <boost/shared_ptr.hpp>
#include <memory>
#include <string>
#include <map>
@@ -45,6 +46,9 @@ namespace qpid {
namespace broker {
class ConnectionState;
}
+namespace sys {
+class Timer;
+}
namespace management {
class ManagementAgent
@@ -73,11 +77,6 @@ public:
/** Called before plugins are initialized */
void configure (const std::string& dataDir, bool publish, uint16_t interval,
qpid::broker::Broker* broker, int threadPoolSize);
- /** Called after plugins are initialized. */
- void pluginsInitialized();
-
- /** Called by cluster to suppress management output during update. */
- void suppress(bool s) { suppressed = s; }
void setName(const std::string& vendor,
const std::string& product,
@@ -100,18 +99,16 @@ public:
const std::string& eventName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
- QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
- uint64_t persistId = 0,
- bool persistent = false);
- QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
- const std::string& key,
- bool persistent = false);
+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object,
+ uint64_t persistId = 0,
+ bool persistent = false);
+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object,
+ const std::string& key,
+ bool persistent = false);
QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
severity_t severity = SEV_DEFAULT);
QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
- QPID_BROKER_EXTERN void clusterUpdate();
-
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args,
@@ -121,25 +118,6 @@ public:
/** Disallow a method. Attempts to call it will receive an exception with message. */
void disallow(const std::string& className, const std::string& methodName, const std::string& message);
- /** Disallow all QMFv1 methods (used in clustered brokers). */
- void disallowV1Methods() { disallowAllV1Methods = true; }
-
- /** Serialize my schemas as a binary blob into schemaOut */
- void exportSchemas(std::string& schemaOut);
-
- /** Serialize my remote-agent map as a binary blob into agentsOut */
- void exportAgents(std::string& agentsOut);
-
- /** Decode a serialized schemas and add to my schema cache */
- void importSchemas(framing::Buffer& inBuf);
-
- /** Decode a serialized agent map */
- void importAgents(framing::Buffer& inBuf);
-
- // these are in support of the managementSetup-state stuff, for synch'ing clustered brokers
- uint64_t getNextObjectId(void) { return nextObjectId; }
- void setNextObjectId(uint64_t o) { nextObjectId = o; }
-
uint16_t getBootSequence(void) { return bootSequence; }
void setBootSequence(uint16_t b) { bootSequence = b; writeData(); }
@@ -148,20 +126,11 @@ public:
static types::Variant::Map toMap(const framing::FieldTable& from);
- // For Clustering: management objects that have been marked as
- // "deleted", but are waiting for their last published object
- // update are not visible to the cluster replication code. These
- // interfaces allow clustering to gather up all the management
- // objects that are deleted in order to allow all clustered
- // brokers to publish the same set of deleted objects.
-
class DeletedObject {
public:
typedef boost::shared_ptr<DeletedObject> shared_ptr;
- DeletedObject(ManagementObject *, bool v1, bool v2);
- DeletedObject( const std::string &encoded );
+ DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2);
~DeletedObject() {};
- void encode( std::string& toBuffer );
const std::string getKey() const {
// used to batch up objects of the same class type
return std::string(packageName + std::string(":") + className);
@@ -181,22 +150,7 @@ public:
typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList;
- /** returns a snapshot of all currently deleted management objects. */
- void exportDeletedObjects( DeletedObjectList& outList );
-
- /** Import a list of deleted objects to send on next publish interval. */
- void importDeletedObjects( const DeletedObjectList& inList );
-
private:
- struct Periodic : public qpid::sys::TimerTask
- {
- ManagementAgent& agent;
-
- Periodic (ManagementAgent& agent, uint32_t seconds);
- virtual ~Periodic ();
- void fire ();
- };
-
// Storage for tracking remote management agents, attached via the client
// management agent API.
//
@@ -207,9 +161,9 @@ private:
uint32_t agentBank;
std::string routingKey;
ObjectId connectionRef;
- qmf::org::apache::qpid::broker::Agent* mgmtObject;
- RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {}
- ManagementObject* GetManagementObject (void) const { return mgmtObject; }
+ qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject;
+ RemoteAgent(ManagementAgent& _agent) : agent(_agent) {}
+ ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
void mapEncode(qpid::types::Variant::Map& _map) const;
@@ -276,7 +230,7 @@ private:
PackageMap packages;
//
- // Protected by userLock
+ // Protected by objectLock
//
ManagementObjectMap managementObjects;
@@ -288,11 +242,11 @@ private:
framing::Uuid uuid;
//
- // Lock hierarchy: If a thread needs to take both addLock and userLock,
- // it MUST take userLock first, then addLock.
+ // Lock ordering: userLock -> addLock -> objectLock
//
sys::Mutex userLock;
sys::Mutex addLock;
+ sys::Mutex objectLock;
qpid::broker::Exchange::shared_ptr mExchange;
qpid::broker::Exchange::shared_ptr dExchange;
@@ -335,53 +289,51 @@ private:
// list of objects that have been deleted, but have yet to be published
// one final time.
// Indexed by a string composed of the object's package and class name.
- // Protected by userLock.
+ // Protected by objectLock.
typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap;
PendingDeletedObjsMap pendingDeletedObjs;
-# define MA_BUFFER_SIZE 65536
- char inputBuffer[MA_BUFFER_SIZE];
- char outputBuffer[MA_BUFFER_SIZE];
- char eventBuffer[MA_BUFFER_SIZE];
- framing::ResizableBuffer msgBuffer;
+ // Pollable queue to serialize event messages
+ typedef std::pair<boost::shared_ptr<broker::Exchange>,
+ broker::Message> ExchangeAndMessage;
+ typedef sys::PollableQueue<ExchangeAndMessage> EventQueue;
//
// Memory statistics object
//
- qmf::org::apache::qpid::broker::Memory *memstat;
+ qmf::org::apache::qpid::broker::Memory::shared_ptr memstat;
void writeData ();
void periodicProcessing (void);
- void deleteObjectNowLH(const ObjectId& oid);
+ void deleteObjectNow(const ObjectId& oid);
void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void sendBufferLH(framing::Buffer& buf,
- uint32_t length,
- qpid::broker::Exchange::shared_ptr exchange,
- const std::string& routingKey);
- void sendBufferLH(framing::Buffer& buf,
- uint32_t length,
- const std::string& exchange,
- const std::string& routingKey);
- void sendBufferLH(const std::string& data,
- const std::string& cid,
- const qpid::types::Variant::Map& headers,
- const std::string& content_type,
- qpid::broker::Exchange::shared_ptr exchange,
- const std::string& routingKey,
- uint64_t ttl_msec = 0);
- void sendBufferLH(const std::string& data,
- const std::string& cid,
- const qpid::types::Variant::Map& headers,
- const std::string& content_type,
- const std::string& exchange,
- const std::string& routingKey,
- uint64_t ttl_msec = 0);
- void moveNewObjectsLH();
- bool moveDeletedObjectsLH();
+ EventQueue::Batch::const_iterator sendEvents(const EventQueue::Batch& batch);
+ void sendBuffer(framing::Buffer& buf,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey);
+ void sendBuffer(framing::Buffer& buf,
+ const std::string& exchange,
+ const std::string& routingKey);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ const std::string& content_type,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey,
+ uint64_t ttl_msec = 0);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ const std::string& content_type,
+ const std::string& exchange,
+ const std::string& routingKey,
+ uint64_t ttl_msec = 0);
+ void moveNewObjects();
+ bool moveDeletedObjects();
- bool authorizeAgentMessageLH(qpid::broker::Message& msg);
- void dispatchAgentCommandLH(qpid::broker::Message& msg, bool viaLocal=false);
+ bool authorizeAgentMessage(qpid::broker::Message& msg);
+ void dispatchAgentCommand(qpid::broker::Message& msg, bool viaLocal=false);
PackageMap::iterator findOrAddPackageLH(std::string name);
void addClassLH(uint8_t kind,
@@ -399,22 +351,22 @@ private:
uint32_t allocateNewBank ();
uint32_t assignBankLH (uint32_t requestedPrefix);
void deleteOrphanedAgentsLH();
- void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence,
- uint32_t code = 0, const std::string& text = "OK");
- void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
- void handleBrokerRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handlePackageQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handlePackageIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleClassQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleClassIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
- void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleAttachRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
- void handleGetQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleMethodRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
- void handleGetQueryLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
- void handleMethodRequestLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
- void handleLocateRequestLH (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
+ void sendCommandComplete(const std::string& replyToKey, uint32_t sequence,
+ uint32_t code = 0, const std::string& text = "OK");
+ void sendException(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
+ void handleBrokerRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handlePackageQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handlePackageInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleClassQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleClassInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleSchemaRequest (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
+ void handleSchemaResponse (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleAttachRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
+ void handleGetQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleMethodRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
+ void handleGetQuery (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
+ void handleMethodRequest (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
+ void handleLocateRequest (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
size_t validateSchema(framing::Buffer&, uint8_t kind);
@@ -424,6 +376,7 @@ private:
std::string summarizeAgents();
void debugSnapshot(const char* title);
+ std::auto_ptr<EventQueue> sendQueue;
};
void setManagementExecutionContext(const qpid::broker::ConnectionState*);
Propchange: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:r1375509-1450773
Modified: qpid/branches/asyncstore/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/messaging/Connection.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/messaging/Connection.cpp Thu Feb 28 16:14:30 2013
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@
#include "qpid/messaging/Session.h"
#include "qpid/messaging/SessionImpl.h"
#include "qpid/messaging/PrivateImplRef.h"
+#include "qpid/messaging/ProtocolRegistry.h"
#include "qpid/client/amqp0_10/ConnectionImpl.h"
#include "qpid/log/Statement.h"
@@ -40,22 +41,32 @@ Connection& Connection::operator=(const
Connection::~Connection() { PI::dtor(*this); }
Connection::Connection(const std::string& url, const std::string& o)
-{
+{
Variant::Map options;
AddressParser parser(o);
if (o.empty() || parser.parseMap(options)) {
- PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ ConnectionImpl* impl = ProtocolRegistry::create(url, options);
+ if (impl) {
+ PI::ctor(*this, impl);
+ } else {
+ PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ }
} else {
throw InvalidOptionString("Invalid option string: " + o);
}
}
Connection::Connection(const std::string& url, const Variant::Map& options)
{
- PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ ConnectionImpl* impl = ProtocolRegistry::create(url, options);
+ if (impl) {
+ PI::ctor(*this, impl);
+ } else {
+ PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ }
}
Connection::Connection()
-{
+{
Variant::Map options;
std::string url = "amqp:tcp:127.0.0.1:5672";
PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
@@ -67,12 +78,12 @@ bool Connection::isOpen() const { return
void Connection::close() { impl->close(); }
Session Connection::createSession(const std::string& name) { return impl->newSession(false, name); }
Session Connection::createTransactionalSession(const std::string& name)
-{
+{
return impl->newSession(true, name);
}
Session Connection::getSession(const std::string& name) const { return impl->getSession(name); }
void Connection::setOption(const std::string& name, const Variant& value)
-{
+{
impl->setOption(name, value);
}
std::string Connection::getAuthenticatedUsername()
Modified: qpid/branches/asyncstore/cpp/src/qpid/messaging/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/messaging/Message.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/messaging/Message.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/messaging/Message.cpp Thu Feb 28 16:14:30 2013
@@ -46,26 +46,26 @@ const std::string& Message::getSubject()
void Message::setContentType(const std::string& s) { impl->setContentType(s); }
const std::string& Message::getContentType() const { return impl->getContentType(); }
-void Message::setMessageId(const std::string& id) { impl->messageId = id; }
-const std::string& Message::getMessageId() const { return impl->messageId; }
+void Message::setMessageId(const std::string& id) { impl->setMessageId(id); }
+const std::string& Message::getMessageId() const { return impl->getMessageId(); }
-void Message::setUserId(const std::string& id) { impl->userId = id; }
-const std::string& Message::getUserId() const { return impl->userId; }
+void Message::setUserId(const std::string& id) { impl->setUserId(id); }
+const std::string& Message::getUserId() const { return impl->getUserId(); }
-void Message::setCorrelationId(const std::string& id) { impl->correlationId = id; }
-const std::string& Message::getCorrelationId() const { return impl->correlationId; }
+void Message::setCorrelationId(const std::string& id) { impl->setCorrelationId(id); }
+const std::string& Message::getCorrelationId() const { return impl->getCorrelationId(); }
-uint8_t Message::getPriority() const { return impl->priority; }
-void Message::setPriority(uint8_t priority) { impl->priority = priority; }
+uint8_t Message::getPriority() const { return impl->getPriority(); }
+void Message::setPriority(uint8_t priority) { impl->setPriority(priority); }
-void Message::setTtl(Duration ttl) { impl->ttl = ttl.getMilliseconds(); }
-Duration Message::getTtl() const { return Duration(impl->ttl); }
+void Message::setTtl(Duration ttl) { impl->setTtl(ttl.getMilliseconds()); }
+Duration Message::getTtl() const { return Duration(impl->getTtl()); }
-void Message::setDurable(bool durable) { impl->durable = durable; }
-bool Message::getDurable() const { return impl->durable; }
+void Message::setDurable(bool durable) { impl->setDurable(durable); }
+bool Message::getDurable() const { return impl->isDurable(); }
-bool Message::getRedelivered() const { return impl->redelivered; }
-void Message::setRedelivered(bool redelivered) { impl->redelivered = redelivered; }
+bool Message::getRedelivered() const { return impl->isRedelivered(); }
+void Message::setRedelivered(bool redelivered) { impl->setRedelivered(redelivered); }
const Variant::Map& Message::getProperties() const { return impl->getHeaders(); }
Variant::Map& Message::getProperties() { return impl->getHeaders(); }
Modified: qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.cpp Thu Feb 28 16:14:30 2013
@@ -45,28 +45,163 @@ MessageImpl::MessageImpl(const char* cha
bytes(chars, count),
internalId(0) {}
-void MessageImpl::setReplyTo(const Address& d) { replyTo = d; }
-const Address& MessageImpl::getReplyTo() const { return replyTo; }
+void MessageImpl::setReplyTo(const Address& d)
+{
+ replyTo = d;
+ updated();
+}
+const Address& MessageImpl::getReplyTo() const
+{
+ if (!replyTo && encoded) encoded->getReplyTo(replyTo);
+ return replyTo;
+}
-void MessageImpl::setSubject(const std::string& s) { subject = s; }
-const std::string& MessageImpl::getSubject() const { return subject; }
+void MessageImpl::setSubject(const std::string& s)
+{
+ subject = s;
+ updated();
+}
+const std::string& MessageImpl::getSubject() const
+{
+ if (!subject.size() && encoded) encoded->getSubject(subject);
+ return subject;
+}
-void MessageImpl::setContentType(const std::string& s) { contentType = s; }
-const std::string& MessageImpl::getContentType() const { return contentType; }
+void MessageImpl::setContentType(const std::string& s)
+{
+ contentType = s;
+ updated();
+}
+const std::string& MessageImpl::getContentType() const
+{
+ if (!contentType.size() && encoded) encoded->getContentType(contentType);
+ return contentType;
+}
-const Variant::Map& MessageImpl::getHeaders() const { return headers; }
-Variant::Map& MessageImpl::getHeaders() { return headers; }
-void MessageImpl::setHeader(const std::string& key, const qpid::types::Variant& val) { headers[key] = val; }
+void MessageImpl::setMessageId(const std::string& s)
+{
+ messageId = s;
+ updated();
+}
+const std::string& MessageImpl::getMessageId() const
+{
+ if (!messageId.size() && encoded) encoded->getMessageId(messageId);
+ return messageId;
+}
+void MessageImpl::setUserId(const std::string& s)
+{
+ userId = s;
+ updated();
+}
+const std::string& MessageImpl::getUserId() const
+{
+ if (!userId.size() && encoded) encoded->getUserId(userId);
+ return userId;
+}
+void MessageImpl::setCorrelationId(const std::string& s)
+{
+ correlationId = s;
+ updated();
+}
+const std::string& MessageImpl::getCorrelationId() const
+{
+ if (!correlationId.size() && encoded) encoded->getCorrelationId(correlationId);
+ return correlationId;
+}
+void MessageImpl::setPriority(uint8_t p)
+{
+ priority = p;
+}
+uint8_t MessageImpl::getPriority() const
+{
+ return priority;
+}
+void MessageImpl::setTtl(uint64_t t)
+{
+ ttl = t;
+}
+uint64_t MessageImpl::getTtl() const
+{
+ return ttl;
+}
+void MessageImpl::setDurable(bool d)
+{
+ durable = d;
+}
+bool MessageImpl::isDurable() const
+{
+ return durable;
+}
+void MessageImpl::setRedelivered(bool b)
+{
+ redelivered = b;
+}
+bool MessageImpl::isRedelivered() const
+{
+ return redelivered;
+}
+
+const Variant::Map& MessageImpl::getHeaders() const
+{
+ if (!headers.size() && encoded) encoded->populate(headers);
+ return headers;
+}
+Variant::Map& MessageImpl::getHeaders() {
+ if (!headers.size() && encoded) encoded->populate(headers);
+ updated();
+ return headers;
+}
+void MessageImpl::setHeader(const std::string& key, const qpid::types::Variant& val)
+{
+ headers[key] = val; updated();
+}
//should these methods be on MessageContent?
-void MessageImpl::setBytes(const std::string& c) { bytes = c; }
-void MessageImpl::setBytes(const char* chars, size_t count) { bytes.assign(chars, count); }
-const std::string& MessageImpl::getBytes() const { return bytes; }
-std::string& MessageImpl::getBytes() { return bytes; }
+void MessageImpl::setBytes(const std::string& c)
+{
+ bytes = c;
+ updated();
+}
+void MessageImpl::setBytes(const char* chars, size_t count)
+{
+ bytes.assign(chars, count);
+ updated();
+}
+void MessageImpl::appendBytes(const char* chars, size_t count)
+{
+ bytes.append(chars, count);
+ updated();
+}
+const std::string& MessageImpl::getBytes() const
+{
+ if (!bytes.size() && encoded) encoded->getBody(bytes);
+ return bytes;
+}
+std::string& MessageImpl::getBytes()
+{
+ if (!bytes.size() && encoded) encoded->getBody(bytes);
+ updated();//have to assume body may be edited, invalidating our message
+ return bytes;
+}
void MessageImpl::setInternalId(qpid::framing::SequenceNumber i) { internalId = i; }
qpid::framing::SequenceNumber MessageImpl::getInternalId() { return internalId; }
+void MessageImpl::updated()
+{
+
+ if (!replyTo && encoded) encoded->getReplyTo(replyTo);
+ if (!subject.size() && encoded) encoded->getSubject(subject);
+ if (!contentType.size() && encoded) encoded->getContentType(contentType);
+ if (!messageId.size() && encoded) encoded->getMessageId(messageId);
+ if (!userId.size() && encoded) encoded->getUserId(userId);
+ if (!correlationId.size() && encoded) encoded->getCorrelationId(correlationId);
+ if (!headers.size() && encoded) encoded->populate(headers);
+ if (!bytes.size() && encoded) encoded->getBody(bytes);
+
+ encoded.reset();
+}
+
MessageImpl& MessageImplAccess::get(Message& msg)
{
return *msg.impl;
Modified: qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.h Thu Feb 28 16:14:30 2013
@@ -24,52 +24,77 @@
#include "qpid/messaging/Address.h"
#include "qpid/types/Variant.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/messaging/amqp/EncodedMessage.h"
+#include <vector>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace messaging {
-struct MessageImpl
+class MessageImpl
{
- Address replyTo;
- std::string subject;
- std::string contentType;
- std::string messageId;
- std::string userId;
- std::string correlationId;
+ private:
+ mutable Address replyTo;
+ mutable std::string subject;
+ mutable std::string contentType;
+ mutable std::string messageId;
+ mutable std::string userId;
+ mutable std::string correlationId;
uint8_t priority;
uint64_t ttl;
bool durable;
bool redelivered;
- qpid::types::Variant::Map headers;
+ mutable qpid::types::Variant::Map headers;
- std::string bytes;
+ mutable std::string bytes;
+ boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> encoded;
qpid::framing::SequenceNumber internalId;
+ void updated();
+ public:
MessageImpl(const std::string& c);
MessageImpl(const char* chars, size_t count);
void setReplyTo(const Address& d);
const Address& getReplyTo() const;
-
+
void setSubject(const std::string& s);
const std::string& getSubject() const;
-
+
void setContentType(const std::string& s);
const std::string& getContentType() const;
-
+
+ void setMessageId(const std::string&);
+ const std::string& getMessageId() const;
+ void setUserId(const std::string& );
+ const std::string& getUserId() const;
+ void setCorrelationId(const std::string& );
+ const std::string& getCorrelationId() const;
+ void setPriority(uint8_t);
+ uint8_t getPriority() const;
+ void setTtl(uint64_t);
+ uint64_t getTtl() const;
+ void setDurable(bool);
+ bool isDurable() const;
+ void setRedelivered(bool);
+ bool isRedelivered() const;
+
+
const qpid::types::Variant::Map& getHeaders() const;
qpid::types::Variant::Map& getHeaders();
void setHeader(const std::string& key, const qpid::types::Variant& val);
-
+
void setBytes(const std::string& bytes);
void setBytes(const char* chars, size_t count);
+ void appendBytes(const char* chars, size_t count);
const std::string& getBytes() const;
std::string& getBytes();
void setInternalId(qpid::framing::SequenceNumber id);
qpid::framing::SequenceNumber getInternalId();
-
+ void setEncoded(boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> e) { encoded = e; }
+ boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> getEncoded() const { return encoded; }
};
class Message;
Modified: qpid/branches/asyncstore/cpp/src/qpid/messaging/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/messaging/ReceiverImpl.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/messaging/ReceiverImpl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/messaging/ReceiverImpl.h Thu Feb 28 16:14:30 2013
@@ -22,10 +22,12 @@
*
*/
#include "qpid/RefCounted.h"
+#include "qpid/sys/IntegerTypes.h"
namespace qpid {
namespace messaging {
+class Duration;
class Message;
class MessageListener;
class Session;
Modified: qpid/branches/asyncstore/cpp/src/qpid/messaging/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/messaging/SenderImpl.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/messaging/SenderImpl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/messaging/SenderImpl.h Thu Feb 28 16:14:30 2013
@@ -22,6 +22,7 @@
*
*/
#include "qpid/RefCounted.h"
+#include "qpid/sys/IntegerTypes.h"
namespace qpid {
namespace messaging {
Modified: qpid/branches/asyncstore/cpp/src/qpid/store/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/CMakeLists.txt?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/CMakeLists.txt (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/CMakeLists.txt Thu Feb 28 16:14:30 2013
@@ -42,6 +42,7 @@ if (CMAKE_COMPILER_IS_GNUCXX)
set_target_properties (store PROPERTIES
PREFIX ""
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
LINK_FLAGS "${GCC_CATCH_UNDEFINED}")
endif (CMAKE_COMPILER_IS_GNUCXX)
@@ -54,7 +55,9 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
endif (MSVC)
endif (CMAKE_SYSTEM_NAME STREQUAL Windows)
-set_target_properties (store PROPERTIES VERSION ${qpidc_version})
+set_target_properties (store PROPERTIES
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
+ VERSION ${qpidc_version})
install (TARGETS store # RUNTIME
DESTINATION ${QPIDD_MODULE_DIR}
COMPONENT ${QPID_COMPONENT_BROKER})
@@ -81,6 +84,7 @@ if (BUILD_MSSQL)
ms-sql/State.cpp
ms-sql/TplRecordset.cpp
ms-sql/VariantHelper.cpp)
+ set_target_properties (mssql_store PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
target_link_libraries (mssql_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY})
install (TARGETS mssql_store # RUNTIME
DESTINATION ${QPIDD_MODULE_DIR}
@@ -110,6 +114,7 @@ if (BUILD_MSCLFS)
ms-sql/State.cpp
ms-sql/VariantHelper.cpp)
include_directories(ms-sql)
+ set_target_properties (msclfs_store PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
target_link_libraries (msclfs_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY} clfsw32.lib)
install (TARGETS msclfs_store # RUNTIME
DESTINATION ${QPIDD_MODULE_DIR}
Modified: qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp Thu Feb 28 16:14:30 2013
@@ -137,12 +137,6 @@ MessageStorePlugin::providerAvailable(co
QPID_LOG(warning, "Storage provider " << name << " duplicate; ignored.");
}
-void
-MessageStorePlugin::truncateInit(const bool /*saveStoreContent*/)
-{
- QPID_LOG(info, "Store: truncateInit");
-}
-
/**
* Record the existence of a durable queue
Modified: qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.h Thu Feb 28 16:14:30 2013
@@ -24,18 +24,22 @@
#include "qpid/Plugin.h"
#include "qpid/Options.h"
-#include "qpid/broker/Broker.h"
#include "qpid/broker/MessageStore.h"
-#include "qpid/broker/PersistableExchange.h"
-#include "qpid/broker/PersistableMessage.h"
-#include "qpid/broker/PersistableQueue.h"
-#include "qpid/management/Manageable.h"
+//#include "qpid/management/Manageable.h"
#include <string>
using namespace qpid;
namespace qpid {
+
+namespace broker {
+class Broker;
+class PersistableExchange;
+class PersistableMessage;
+class PersistableQueue;
+}
+
namespace store {
class StorageProvider;
@@ -82,18 +86,6 @@ class MessageStorePlugin :
/**
* @name Methods inherited from qpid::broker::MessageStore
*/
- //@{
- /**
- * If called before recovery, will discard the database and reinitialize
- * using an empty store. This is used when cluster nodes recover and
- * must get their content from a cluster sync rather than directly from
- * the store.
- *
- * @param saveStoreContent If true, the store's contents should be
- * saved to a backup location before
- * reinitializing the store content.
- */
- virtual void truncateInit(const bool saveStoreContent = false);
/**
* Record the existence of a durable queue
Modified: qpid/branches/asyncstore/cpp/src/qpid/store/StorageProvider.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/StorageProvider.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/StorageProvider.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/StorageProvider.h Thu Feb 28 16:14:30 2013
@@ -143,20 +143,6 @@ public:
/**
* @name Methods inherited from qpid::broker::MessageStore
*/
- //@{
- /**
- * If called after init() but before recovery, will discard the database
- * and reinitialize using an empty store dir. If @a pushDownStoreFiles
- * is true, the content of the store dir will be moved to a backup dir
- * inside the store dir. This is used when cluster nodes recover and must
- * get thier content from a cluster sync rather than directly fromt the
- * store.
- *
- * @param pushDownStoreFiles If true, will move content of the store dir
- * into a subdir, leaving the store dir
- * otherwise empty.
- */
- virtual void truncateInit(const bool pushDownStoreFiles = false) = 0;
/**
* Record the existence of a durable queue
Modified: qpid/branches/asyncstore/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp Thu Feb 28 16:14:30 2013
@@ -26,6 +26,7 @@
#include <string>
#include <windows.h>
#include <clfsw32.h>
+#include <qpid/broker/Broker.h>
#include <qpid/broker/RecoverableQueue.h>
#include <qpid/log/Statement.h>
#include <qpid/store/MessageStorePlugin.h>
@@ -108,20 +109,6 @@ public:
/**
* @name Methods inherited from qpid::broker::MessageStore
*/
- //@{
- /**
- * If called after init() but before recovery, will discard the database
- * and reinitialize using an empty store dir. If @a pushDownStoreFiles
- * is true, the content of the store dir will be moved to a backup dir
- * inside the store dir. This is used when cluster nodes recover and must
- * get their content from a cluster sync rather than directly from the
- * store.
- *
- * @param pushDownStoreFiles If true, will move content of the store dir
- * into a subdir, leaving the store dir
- * otherwise empty.
- */
- virtual void truncateInit(const bool pushDownStoreFiles = false);
/**
* Record the existence of a durable queue
@@ -467,11 +454,6 @@ MSSqlClfsProvider::activate(MessageStore
}
void
-MSSqlClfsProvider::truncateInit(const bool pushDownStoreFiles)
-{
-}
-
-void
MSSqlClfsProvider::create(PersistableQueue& queue,
const qpid::framing::FieldTable& /*args needed for jrnl*/)
{
Modified: qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp Thu Feb 28 16:14:30 2013
@@ -92,20 +92,6 @@ public:
/**
* @name Methods inherited from qpid::broker::MessageStore
*/
- //@{
- /**
- * If called after init() but before recovery, will discard the database
- * and reinitialize using an empty store dir. If @a pushDownStoreFiles
- * is true, the content of the store dir will be moved to a backup dir
- * inside the store dir. This is used when cluster nodes recover and must
- * get thier content from a cluster sync rather than directly fromt the
- * store.
- *
- * @param pushDownStoreFiles If true, will move content of the store dir
- * into a subdir, leaving the store dir
- * otherwise empty.
- */
- virtual void truncateInit(const bool pushDownStoreFiles = false);
/**
* Record the existence of a durable queue
@@ -392,11 +378,6 @@ MSSqlProvider::activate(MessageStorePlug
}
void
-MSSqlProvider::truncateInit(const bool pushDownStoreFiles)
-{
-}
-
-void
MSSqlProvider::create(PersistableQueue& queue,
const qpid::framing::FieldTable& /*args needed for jrnl*/)
{
Modified: qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp Thu Feb 28 16:14:30 2013
@@ -147,7 +147,7 @@ MessageRecordset::recover(qpid::broker::
// Now, do we need the rest of the content?
long contentLength = blobSize - headerFieldLength - headerSize;
- if (msg->loadContent(contentLength)) {
+ if (contentLength > 0 && msg->loadContent(contentLength)) {
BlobAdapter content(contentLength);
content =
rs->Fields->Item["fieldTableBlob"]->GetChunk(contentLength);
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.cpp Thu Feb 28 16:14:30 2013
@@ -32,8 +32,6 @@ void AggregateOutput::abort() { control.
void AggregateOutput::activateOutput() { control.activateOutput(); }
-void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); }
-
namespace {
// Clear the busy flag and notify waiting threads in destructor.
struct ScopedBusy {
@@ -51,6 +49,7 @@ bool AggregateOutput::doOutput() {
while (!tasks.empty()) {
OutputTask* t=tasks.front();
tasks.pop_front();
+ taskSet.erase(t);
bool didOutput;
{
// Allow concurrent call to addOutputTask.
@@ -59,7 +58,9 @@ bool AggregateOutput::doOutput() {
didOutput = t->doOutput();
}
if (didOutput) {
- tasks.push_back(t);
+ if (taskSet.insert(t).second) {
+ tasks.push_back(t);
+ }
return true;
}
}
@@ -68,12 +69,15 @@ bool AggregateOutput::doOutput() {
void AggregateOutput::addOutputTask(OutputTask* task) {
Mutex::ScopedLock l(lock);
- tasks.push_back(task);
+ if (taskSet.insert(task).second) {
+ tasks.push_back(task);
+ }
}
void AggregateOutput::removeOutputTask(OutputTask* task) {
Mutex::ScopedLock l(lock);
while (busy) lock.wait();
+ taskSet.erase(task);
tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end());
}
@@ -81,6 +85,7 @@ void AggregateOutput::removeAll()
{
Mutex::ScopedLock l(lock);
while (busy) lock.wait();
+ taskSet.clear();
tasks.clear();
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.h Thu Feb 28 16:14:30 2013
@@ -28,6 +28,7 @@
#include <algorithm>
#include <deque>
+#include <set>
namespace qpid {
namespace sys {
@@ -44,9 +45,11 @@ namespace sys {
class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public OutputControl
{
typedef std::deque<OutputTask*> TaskList;
+ typedef std::set<OutputTask*> TaskSet;
Monitor lock;
TaskList tasks;
+ TaskSet taskSet;
bool busy;
OutputControl& control;
@@ -56,7 +59,6 @@ class QPID_COMMON_CLASS_EXTERN Aggregate
// These may be called concurrently with any function.
QPID_COMMON_EXTERN void abort();
QPID_COMMON_EXTERN void activateOutput();
- QPID_COMMON_EXTERN void giveReadCredit(int32_t);
QPID_COMMON_EXTERN void addOutputTask(OutputTask* t);
// These functions must not be called concurrently with each other.
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h Thu Feb 28 16:14:30 2013
@@ -21,9 +21,11 @@
*
*/
-#include "qpid/sys/IntegerTypes.h"
#include "qpid/CommonImportExport.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/SecuritySettings.h"
+
#include <string.h>
#include <boost/function.hpp>
@@ -56,6 +58,7 @@ class AsynchConnector {
public:
typedef boost::function1<void, const Socket&> ConnectedCallback;
typedef boost::function3<void, const Socket&, int, const std::string&> FailedCallback;
+ typedef boost::function1<void, AsynchConnector&> RequestCallback;
// Call create() to allocate a new AsynchConnector object with the
// specified poller, addressing, and callbacks.
@@ -70,6 +73,7 @@ public:
FailedCallback failCb);
virtual void start(boost::shared_ptr<Poller> poller) = 0;
virtual void stop() {};
+ virtual void requestCallback(RequestCallback) = 0;
protected:
AsynchConnector() {}
virtual ~AsynchConnector() {}
@@ -155,11 +159,11 @@ public:
virtual void notifyPendingWrite() = 0;
virtual void queueWriteClose() = 0;
virtual bool writeQueueEmpty() = 0;
- virtual void startReading() = 0;
- virtual void stopReading() = 0;
virtual void requestCallback(RequestCallback) = 0;
virtual BufferBase* getQueuedBuffer() = 0;
+ virtual SecuritySettings getSecuritySettings() = 0;
+
protected:
// Derived class manages lifetime; must be constructed using the
// static create() method. Deletes not allowed from outside.
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp Thu Feb 28 16:14:30 2013
@@ -51,15 +51,15 @@ struct ProtocolTimeoutTask : public sys:
}
};
-AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) :
+AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool isClient0, bool nodict0) :
identifier(id),
aio(0),
factory(f),
codec(0),
reads(0),
readError(false),
- isClient(false),
- readCredit(InfiniteCredit)
+ isClient(isClient0),
+ nodict(nodict0)
{}
AsynchIOHandler::~AsynchIOHandler() {
@@ -97,25 +97,20 @@ void AsynchIOHandler::abort() {
if (!readError) {
aio->requestCallback(boost::bind(&AsynchIOHandler::eof, this, _1));
}
+ aio->queueWriteClose();
}
void AsynchIOHandler::activateOutput() {
aio->notifyPendingWrite();
}
-// Input side
-void AsynchIOHandler::giveReadCredit(int32_t credit) {
- // Check whether we started in the don't about credit state
- if (readCredit.boolCompareAndSwap(InfiniteCredit, credit))
- return;
- // TODO In theory should be able to use an atomic operation before taking the lock
- // but in practice there seems to be an unexplained race in that case
- ScopedLock<Mutex> l(creditLock);
- if (readCredit.fetchAndAdd(credit) != 0)
- return;
- assert(readCredit.get() >= 0);
- if (readCredit.get() != 0)
- aio->startReading();
+namespace {
+ SecuritySettings getSecuritySettings(AsynchIO* aio, bool nodict)
+ {
+ SecuritySettings settings = aio->getSecuritySettings();
+ settings.nodict = nodict;
+ return settings;
+ }
}
void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
@@ -123,26 +118,6 @@ void AsynchIOHandler::readbuff(AsynchIO&
return;
}
- // Check here for read credit
- if (readCredit.get() != InfiniteCredit) {
- if (readCredit.get() == 0) {
- // FIXME aconway 2009-10-01: Workaround to avoid "false wakeups".
- // readbuff is sometimes called with no credit.
- // This should be fixed somewhere else to avoid such calls.
- aio->unread(buff);
- return;
- }
- // TODO In theory should be able to use an atomic operation before taking the lock
- // but in practice there seems to be an unexplained race in that case
- ScopedLock<Mutex> l(creditLock);
- if (--readCredit == 0) {
- assert(readCredit.get() >= 0);
- if (readCredit.get() == 0) {
- aio->stopReading();
- }
- }
- }
-
++reads;
size_t decoded = 0;
if (codec) { // Already initiated
@@ -168,13 +143,16 @@ void AsynchIOHandler::readbuff(AsynchIO&
QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
try {
- codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
+ codec = factory->create(protocolInit.getVersion(), *this, identifier, getSecuritySettings(aio, nodict));
if (!codec) {
//TODO: may still want to revise this...
//send valid version header & close connection.
write(framing::ProtocolInitiation(framing::highestProtocolVersion));
readError = true;
aio->queueWriteClose();
+ } else {
+ //read any further data that may already have been sent
+ decoded += codec->decode(buff->bytes+buff->dataStart+in.getPosition(), buff->dataCount-in.getPosition());
}
} catch (const std::exception& e) {
QPID_LOG(error, e.what());
@@ -223,7 +201,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&)
void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
- codec = factory->create(*this, identifier, SecuritySettings());
+ codec = factory->create(*this, identifier, getSecuritySettings(aio, nodict));
write(framing::ProtocolInitiation(codec->getVersion()));
// We've just sent the protocol negotiation so we can cancel the timeout for that
// This is not ideal, because we've not received anything yet, but heartbeats will
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h Thu Feb 28 16:14:30 2013
@@ -51,24 +51,19 @@ class AsynchIOHandler : public OutputCon
uint32_t reads;
bool readError;
bool isClient;
- AtomicValue<int32_t> readCredit;
- static const int32_t InfiniteCredit = -1;
- Mutex creditLock;
+ bool nodict;
boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
void write(const framing::ProtocolInitiation&);
public:
- QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f );
+ QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f, bool isClient, bool nodict);
QPID_COMMON_EXTERN ~AsynchIOHandler();
QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime);
- QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
-
// Output side
QPID_COMMON_EXTERN void abort();
QPID_COMMON_EXTERN void activateOutput();
- QPID_COMMON_EXTERN void giveReadCredit(int32_t credit);
// Input side
QPID_COMMON_EXTERN void readbuff(AsynchIO& aio, AsynchIOBufferBase* buff);
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/Codec.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/Codec.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/Codec.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/Codec.h Thu Feb 28 16:14:30 2013
@@ -42,7 +42,7 @@ class Codec
/** Encode into buffer, return number of bytes encoded */
- virtual std::size_t encode(const char* buffer, std::size_t size) = 0;
+ virtual std::size_t encode(char* buffer, std::size_t size) = 0;
/** Return true if we have data to encode */
virtual bool canEncode() = 0;
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h Thu Feb 28 16:14:30 2013
@@ -45,7 +45,6 @@ class ConnectionOutputHandlerPtr : publi
size_t getBuffered() const { return next->getBuffered(); }
void abort() { next->abort(); }
void activateOutput() { next->activateOutput(); }
- void giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
void send(framing::AMQFrame& f) { next->send(f); }
private:
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/FileSysDir.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/FileSysDir.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/FileSysDir.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/FileSysDir.h Thu Feb 28 16:14:30 2013
@@ -54,6 +54,15 @@ class FileSysDir
void mkdir(void);
+ typedef void Callback(const std::string&);
+
+ /**
+ * Call the Callback function for every regular file in the directory
+ *
+ * @param cb Callback function that receives the full path to the file
+ */
+ void forEachFile(Callback cb) const;
+
std::string getPath () { return dirPath; }
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/OutputControl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/OutputControl.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/OutputControl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/OutputControl.h Thu Feb 28 16:14:30 2013
@@ -1,3 +1,6 @@
+#ifndef QPID_SYS_OUTPUT_CONTROL_H
+#define QPID_SYS_OUTPUT_CONTROL_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,9 +24,6 @@
#include "qpid/sys/IntegerTypes.h"
-#ifndef _OutputControl_
-#define _OutputControl_
-
namespace qpid {
namespace sys {
@@ -33,11 +33,10 @@ namespace sys {
virtual ~OutputControl() {}
virtual void abort() = 0;
virtual void activateOutput() = 0;
- virtual void giveReadCredit(int32_t credit) = 0;
};
}
}
-#endif
+#endif /*!QPID_SYS_OUTPUT_CONTROL_H*/
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ProtocolFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ProtocolFactory.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ProtocolFactory.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ProtocolFactory.h Thu Feb 28 16:14:30 2013
@@ -42,10 +42,10 @@ class ProtocolFactory : public qpid::Sha
virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
virtual void connect(
boost::shared_ptr<Poller>,
+ const std::string& name,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* codec,
ConnectFailedCallback failed) = 0;
- virtual bool supports(const std::string& /*capability*/) { return false; }
};
inline ProtocolFactory::~ProtocolFactory() {}
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/RdmaIOPlugin.cpp Thu Feb 28 16:14:30 2013
@@ -23,6 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/rdma/RdmaIO.h"
@@ -67,7 +68,6 @@ class RdmaIOHandler : public OutputContr
void close();
void abort();
void activateOutput();
- void giveReadCredit(int32_t credit);
void initProtocolOut();
// Input side
@@ -83,7 +83,7 @@ class RdmaIOHandler : public OutputContr
};
RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) :
- identifier(c->getFullName()),
+ identifier(broker::QPID_NAME_PREFIX+c->getFullName()),
factory(f),
codec(0),
readError(false),
@@ -199,10 +199,6 @@ void RdmaIOHandler::full(Rdma::AsynchIO&
QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]");
}
-// TODO: Dummy implementation of read throttling
-void RdmaIOHandler::giveReadCredit(int32_t) {
-}
-
// The logic here is subtly different from TCP as RDMA is message oriented
// so we define that an RDMA message is a frame - in this case there is no putting back
// of any message remainder - there shouldn't be any. And what we read here can't be
@@ -250,7 +246,7 @@ class RdmaIOProtocolFactory : public Pro
public:
RdmaIOProtocolFactory(int16_t port, int backlog);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
+ void connect(Poller::shared_ptr, const std::string& name, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
uint16_t getPort() const;
@@ -371,6 +367,7 @@ void RdmaIOProtocolFactory::connected(Po
void RdmaIOProtocolFactory::connect(
Poller::shared_ptr poller,
+ const std::string& /*name*/,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* f,
ConnectFailedCallback failed)
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/SecurityLayer.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/SecurityLayer.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/SecurityLayer.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/SecurityLayer.h Thu Feb 28 16:14:30 2013
@@ -33,8 +33,12 @@ namespace sys {
class SecurityLayer : public Codec
{
public:
+ SecurityLayer(int ssf_) : ssf(ssf_) {}
+ int getSsf() const { return ssf; }
virtual void init(Codec*) = 0;
virtual ~SecurityLayer() {}
+ private:
+ int ssf;
};
}} // namespace qpid::sys
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/SecuritySettings.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/SecuritySettings.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/SecuritySettings.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/SecuritySettings.h Thu Feb 28 16:14:30 2013
@@ -21,6 +21,8 @@
* under the License.
*
*/
+#include <string>
+
namespace qpid {
namespace sys {
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/Socket.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/Socket.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/Socket.h Thu Feb 28 16:14:30 2013
@@ -22,7 +22,6 @@
*
*/
-#include "qpid/sys/IOHandle.h"
#include "qpid/sys/IntegerTypes.h"
#include "qpid/CommonImportExport.h"
#include <string>
@@ -31,45 +30,43 @@ namespace qpid {
namespace sys {
class Duration;
+class IOHandle;
class SocketAddress;
-class QPID_COMMON_CLASS_EXTERN Socket : public IOHandle
+class Socket
{
public:
- /** Create a socket wrapper for descriptor. */
- QPID_COMMON_EXTERN Socket();
+ virtual ~Socket() {};
- /** Create a new Socket which is the same address family as this one */
- QPID_COMMON_EXTERN Socket* createSameTypeSocket() const;
+ virtual operator const IOHandle&() const = 0;
/** Set socket non blocking */
- void setNonblocking() const;
+ virtual void setNonblocking() const = 0;
- QPID_COMMON_EXTERN void setTcpNoDelay() const;
+ virtual void setTcpNoDelay() const = 0;
- QPID_COMMON_EXTERN void connect(const std::string& host, const std::string& port) const;
- QPID_COMMON_EXTERN void connect(const SocketAddress&) const;
+ virtual void connect(const SocketAddress&) const = 0;
+ virtual void finishConnect(const SocketAddress&) const = 0;
- QPID_COMMON_EXTERN void close() const;
+ virtual void close() const = 0;
/** Bind to a port and start listening.
*@param port 0 means choose an available port.
*@param backlog maximum number of pending connections.
*@return The bound port.
*/
- QPID_COMMON_EXTERN int listen(const std::string& host = "", const std::string& port = "0", int backlog = 10) const;
- QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const;
+ virtual int listen(const SocketAddress&, int backlog = 10) const = 0;
/**
* Returns an address (host and port) for the remote end of the
* socket
*/
- QPID_COMMON_EXTERN std::string getPeerAddress() const;
+ virtual std::string getPeerAddress() const = 0;
/**
* Returns an address (host and port) for the local end of the
* socket
*/
- QPID_COMMON_EXTERN std::string getLocalAddress() const;
+ virtual std::string getLocalAddress() const = 0;
/**
* Returns the full address of the connection: local and remote host and port.
@@ -80,31 +77,24 @@ public:
* Returns the error code stored in the socket. This may be used
* to determine the result of a non-blocking connect.
*/
- QPID_COMMON_EXTERN int getError() const;
+ virtual int getError() const = 0;
/** Accept a connection from a socket that is already listening
* and has an incoming connection
*/
- QPID_COMMON_EXTERN Socket* accept() const;
+ virtual Socket* accept() const = 0;
- // TODO The following are raw operations, maybe they need better wrapping?
- QPID_COMMON_EXTERN int read(void *buf, size_t count) const;
- QPID_COMMON_EXTERN int write(const void *buf, size_t count) const;
-
-private:
- /** Create socket */
- void createSocket(const SocketAddress&) const;
+ virtual int read(void *buf, size_t count) const = 0;
+ virtual int write(const void *buf, size_t count) const = 0;
-public:
- /** Construct socket with existing handle */
- Socket(IOHandlePrivate*);
-
-protected:
- mutable std::string localname;
- mutable std::string peername;
- mutable bool nonblocking;
- mutable bool nodelay;
+ /* Transport security related: */
+ virtual int getKeyLen() const = 0;
+ virtual std::string getClientAuthId() const = 0;
};
+/** Make the default socket for whatever platform we are executing on
+ */
+QPID_COMMON_EXTERN Socket* createSocket();
+
}}
#endif /*!_sys_Socket_h*/
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/SocketAddress.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/SocketAddress.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/SocketAddress.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/SocketAddress.h Thu Feb 28 16:14:30 2013
@@ -44,11 +44,12 @@ public:
QPID_COMMON_EXTERN bool nextAddress();
QPID_COMMON_EXTERN std::string asString(bool numeric=true) const;
+ QPID_COMMON_EXTERN std::string getHost() const;
QPID_COMMON_EXTERN void setAddrInfoPort(uint16_t port);
QPID_COMMON_EXTERN static std::string asString(::sockaddr const * const addr, size_t addrlen);
QPID_COMMON_EXTERN static uint16_t getPort(::sockaddr const * const addr);
-
+
private:
std::string host;
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp Thu Feb 28 16:14:30 2013
@@ -22,19 +22,19 @@
#include "qpid/sys/ProtocolFactory.h"
#include "qpid/Plugin.h"
-#include "qpid/sys/ssl/check.h"
-#include "qpid/sys/ssl/util.h"
-#include "qpid/sys/ssl/SslHandler.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
+#include "qpid/log/Statement.h"
#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
-#include "qpid/sys/ssl/SslIo.h"
+#include "qpid/sys/ssl/util.h"
#include "qpid/sys/ssl/SslSocket.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/log/Statement.h"
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/SystemInfo.h"
+#include "qpid/sys/Poller.h"
#include <boost/bind.hpp>
-#include <memory>
-
+#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
@@ -64,38 +64,32 @@ struct SslServerOptions : ssl::SslOption
}
};
-template <class T>
-class SslProtocolFactoryTmpl : public ProtocolFactory {
- private:
-
- typedef SslAcceptorTmpl<T> SslAcceptor;
-
+class SslProtocolFactory : public ProtocolFactory {
+ boost::ptr_vector<Socket> listeners;
+ boost::ptr_vector<AsynchAcceptor> acceptors;
Timer& brokerTimer;
uint32_t maxNegotiateTime;
+ uint16_t listeningPort;
const bool tcpNoDelay;
- T listener;
- const uint16_t listeningPort;
- std::auto_ptr<SslAcceptor> acceptor;
bool nodict;
public:
- SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay, Timer& timer, uint32_t maxTime);
+ SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
+ Timer& timer);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
+ void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
- boost::function2<void, int, std::string> failed);
+ ConnectFailedCallback);
uint16_t getPort() const;
- bool supports(const std::string& capability);
private:
- void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
- bool isClient);
+ void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
+ void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
+ void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
+ void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
};
-typedef SslProtocolFactoryTmpl<SslSocket> SslProtocolFactory;
-typedef SslProtocolFactoryTmpl<SslMuxSocket> SslMuxProtocolFactory;
-
// Static instance to initialise plugin
static struct SslPlugin : public Plugin {
@@ -124,7 +118,7 @@ static struct SslPlugin : public Plugin
}
}
}
-
+
void initialize(Target& target) {
QPID_LOG(trace, "Initialising SSL plugin");
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
@@ -139,19 +133,16 @@ static struct SslPlugin : public Plugin
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(options.multiplex ?
- static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options,
- opts.connectionBacklog,
- opts.tcpNoDelay,
- broker->getTimer(), opts.maxNegotiateTime)) :
- static_cast<ProtocolFactory*>(new SslProtocolFactory(options,
- opts.connectionBacklog,
- opts.tcpNoDelay,
- broker->getTimer(), opts.maxNegotiateTime)));
- QPID_LOG(notice, "Listening for " <<
- (options.multiplex ? "SSL or TCP" : "SSL") <<
- " connections on TCP port " <<
- protocol->getPort());
+ ProtocolFactory::shared_ptr protocol(
+ static_cast<ProtocolFactory*>(new SslProtocolFactory(opts, options, broker->getTimer())));
+
+ if (protocol->getPort()!=0 ) {
+ QPID_LOG(notice, "Listening for " <<
+ (options.multiplex ? "SSL or TCP" : "SSL") <<
+ " connections on TCP/TCP6 port " <<
+ protocol->getPort());
+ }
+
broker->registerProtocolFactory("ssl", protocol);
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what());
@@ -161,99 +152,133 @@ static struct SslPlugin : public Plugin
}
} sslPlugin;
-template <class T>
-SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay, Timer& timer, uint32_t maxTime) :
+namespace {
+ // Expand list of Interfaces and addresses to a list of addresses
+ std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
+ std::vector<std::string> addresses;
+ // If there are no specific interfaces listed use a single "" to listen on every interface
+ if (interfaces.empty()) {
+ addresses.push_back("");
+ return addresses;
+ }
+ for (unsigned i = 0; i < interfaces.size(); ++i) {
+ const std::string& interface = interfaces[i];
+ if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
+ // We don't have an interface of that name -
+ // Check for IPv6 ('[' ']') brackets and remove them
+ // then pass to be looked up directly
+ if (interface[0]=='[' && interface[interface.size()-1]==']') {
+ addresses.push_back(interface.substr(1, interface.size()-2));
+ } else {
+ addresses.push_back(interface);
+ }
+ }
+ }
+ return addresses;
+ }
+}
+
+SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
+ Timer& timer) :
brokerTimer(timer),
- maxNegotiateTime(maxTime),
- tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)),
+ maxNegotiateTime(opts.maxNegotiateTime),
+ tcpNoDelay(opts.tcpNoDelay),
nodict(options.nodict)
-{}
-
-void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
- ConnectionCodec::Factory* f, bool isClient,
- Timer& timer, uint32_t maxTime, bool tcpNoDelay, bool nodict) {
- qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
-
- if (tcpNoDelay) {
- s.setTcpNoDelay(tcpNoDelay);
- QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+{
+ std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
+ if (addresses.empty()) {
+ // We specified some interfaces, but couldn't find addresses for them
+ QPID_LOG(warning, "SSL: No specified network interfaces found: Not Listening");
+ listeningPort = 0;
}
- if (isClient) {
- async->setClient();
+ for (unsigned i = 0; i<addresses.size(); ++i) {
+ QPID_LOG(debug, "Using interface: " << addresses[i]);
+ SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
+
+ // We must have at least one resolved address
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = options.multiplex ?
+ new SslMuxSocket(options.certName, options.clientAuth) :
+ new SslSocket(options.certName, options.clientAuth);
+ uint16_t lport = s->listen(sa, opts.connectionBacklog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+
+ listeningPort = lport;
+
+ // Try any other resolved addresses
+ while (sa.nextAddress()) {
+ // Hack to ensure that all listening connections are on the same port
+ sa.setAddrInfoPort(listeningPort);
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = options.multiplex ?
+ new SslMuxSocket(options.certName, options.clientAuth) :
+ new SslSocket(options.certName, options.clientAuth);
+ uint16_t lport = s->listen(sa, opts.connectionBacklog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+ }
}
-
- qpid::sys::ssl::SslIO* aio = new qpid::sys::ssl::SslIO(s,
- boost::bind(&qpid::sys::ssl::SslHandler::readbuff, async, _1, _2),
- boost::bind(&qpid::sys::ssl::SslHandler::eof, async, _1),
- boost::bind(&qpid::sys::ssl::SslHandler::disconnect, async, _1),
- boost::bind(&qpid::sys::ssl::SslHandler::closedSocket, async, _1, _2),
- boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1),
- boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1));
-
- async->init(aio,timer, maxTime);
- aio->start(poller);
}
-template <>
-void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient) {
- const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
-
- SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict);
+void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f) {
+ AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
+ establishedCommon(async, poller, s);
}
-template <class T>
-uint16_t SslProtocolFactoryTmpl<T>::getPort() const {
- return listeningPort; // Immutable no need for lock.
+void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, const std::string& name) {
+ AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
+ establishedCommon(async, poller, s);
}
-template <class T>
-void SslProtocolFactoryTmpl<T>::accept(Poller::shared_ptr poller,
- ConnectionCodec::Factory* fact) {
- acceptor.reset(
- new SslAcceptor(listener,
- boost::bind(&SslProtocolFactoryTmpl<T>::established,
- this, poller, _1, fact, false)));
- acceptor->start(poller);
-}
-
-template <>
-void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient) {
- const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
-
- if (sslSock) {
- SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict);
- return;
- }
-
- AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f);
-
+void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
if (tcpNoDelay) {
s.setTcpNoDelay();
QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
- if (isClient) {
- async->setClient();
- }
- AsynchIO* aio = AsynchIO::create
- (s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
+ AsynchIO* aio = AsynchIO::create(
+ s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
async->init(aio, brokerTimer, maxNegotiateTime);
aio->start(poller);
}
-template <class T>
-void SslProtocolFactoryTmpl<T>::connect(
+uint16_t SslProtocolFactory::getPort() const {
+ return listeningPort; // Immutable no need for lock.
+}
+
+void SslProtocolFactory::accept(Poller::shared_ptr poller,
+ ConnectionCodec::Factory* fact) {
+ for (unsigned i = 0; i<listeners.size(); ++i) {
+ acceptors.push_back(
+ AsynchAcceptor::create(listeners[i],
+ boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
+ acceptors[i].start(poller);
+ }
+}
+
+void SslProtocolFactory::connectFailed(
+ const Socket& s, int ec, const std::string& emsg,
+ ConnectFailedCallback failedCb)
+{
+ failedCb(ec, emsg);
+ s.close();
+ delete &s;
+}
+
+void SslProtocolFactory::connect(
Poller::shared_ptr poller,
+ const std::string& name,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
@@ -264,31 +289,23 @@ void SslProtocolFactoryTmpl<T>::connect(
// shutdown. The allocated SslConnector frees itself when it
// is no longer needed.
- qpid::sys::ssl::SslSocket* socket = new qpid::sys::ssl::SslSocket();
- new SslConnector(*socket, poller, host, port,
- boost::bind(&SslProtocolFactoryTmpl<T>::established, this, poller, _1, fact, true),
- failed);
-}
-
-namespace
-{
-const std::string SSL = "ssl";
-}
-
-template <>
-bool SslProtocolFactory::supports(const std::string& capability)
-{
- std::string s = capability;
- transform(s.begin(), s.end(), s.begin(), tolower);
- return s == SSL;
-}
-
-template <>
-bool SslMuxProtocolFactory::supports(const std::string& capability)
-{
- std::string s = capability;
- transform(s.begin(), s.end(), s.begin(), tolower);
- return s == SSL || s == "tcp";
+ Socket* socket = new qpid::sys::ssl::SslSocket();
+ try {
+ AsynchConnector* c = AsynchConnector::create(
+ *socket,
+ host,
+ port,
+ boost::bind(&SslProtocolFactory::establishedOutgoing,
+ this, poller, _1, fact, name),
+ boost::bind(&SslProtocolFactory::connectFailed,
+ this, _1, _2, _3, failed));
+ c->start(poller);
+ } catch (std::exception&) {
+ // TODO: Design question - should we do the error callback and also throw?
+ int errCode = socket->getError();
+ connectFailed(*socket, errCode, strError(errCode), failed);
+ throw;
+ }
}
}} // namespace qpid::sys
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org