You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC

svn commit: r1451244 [13/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...

Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.h Thu Feb 28 16:14:30 2013
@@ -26,7 +26,6 @@
 #include "qpid/broker/Exchange.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/sys/Mutex.h"
-#include "qpid/sys/Timer.h"
 #include "qpid/broker/ConnectionToken.h"
 #include "qpid/management/ManagementObject.h"
 #include "qpid/management/ManagementEvent.h"
@@ -34,9 +33,11 @@
 #include "qmf/org/apache/qpid/broker/Agent.h"
 #include "qmf/org/apache/qpid/broker/Memory.h"
 #include "qpid/sys/MemStat.h"
+#include "qpid/sys/PollableQueue.h"
 #include "qpid/types/Variant.h"
 #include <qpid/framing/AMQFrame.h>
 #include <qpid/framing/ResizableBuffer.h>
+#include <boost/shared_ptr.hpp>
 #include <memory>
 #include <string>
 #include <map>
@@ -45,6 +46,9 @@ namespace qpid {
 namespace broker {
 class ConnectionState;
 }
+namespace sys {
+class Timer;
+}
 namespace management {
 
 class ManagementAgent
@@ -73,11 +77,6 @@ public:
     /** Called before plugins are initialized */
     void configure       (const std::string& dataDir, bool publish, uint16_t interval,
                           qpid::broker::Broker* broker, int threadPoolSize);
-    /** Called after plugins are initialized. */
-    void pluginsInitialized();
-
-    /** Called by cluster to suppress management output during update. */
-    void suppress(bool s) { suppressed = s; }
 
     void setName(const std::string& vendor,
                  const std::string& product,
@@ -100,18 +99,16 @@ public:
                                              const std::string& eventName,
                                              uint8_t*    md5Sum,
                                              ManagementObject::writeSchemaCall_t schemaCall);
-    QPID_BROKER_EXTERN ObjectId addObject   (ManagementObject* object,
-                                             uint64_t          persistId = 0,
-                                             bool              persistent = false);
-    QPID_BROKER_EXTERN ObjectId addObject   (ManagementObject*  object,
-                                             const std::string& key,
-                                             bool               persistent = false);
+    QPID_BROKER_EXTERN ObjectId addObject   (ManagementObject::shared_ptr object,
+                                             uint64_t                     persistId = 0,
+                                             bool                         persistent = false);
+    QPID_BROKER_EXTERN ObjectId addObject   (ManagementObject::shared_ptr object,
+                                             const std::string&           key,
+                                             bool                         persistent = false);
     QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
                                        severity_t severity = SEV_DEFAULT);
     QPID_BROKER_EXTERN void clientAdded     (const std::string& routingKey);
 
-    QPID_BROKER_EXTERN void clusterUpdate();
-
     bool dispatchCommand (qpid::broker::Deliverable&       msg,
                           const std::string&         routingKey,
                           const framing::FieldTable* args,
@@ -121,25 +118,6 @@ public:
     /** Disallow a method. Attempts to call it will receive an exception with message. */
     void disallow(const std::string& className, const std::string& methodName, const std::string& message);
 
-    /** Disallow all QMFv1 methods (used in clustered brokers). */
-    void disallowV1Methods() { disallowAllV1Methods = true; }
-
-    /** Serialize my schemas as a binary blob into schemaOut */
-    void exportSchemas(std::string& schemaOut);
-
-    /** Serialize my remote-agent map as a binary blob into agentsOut */
-    void exportAgents(std::string& agentsOut);
-
-    /** Decode a serialized schemas and add to my schema cache */
-    void importSchemas(framing::Buffer& inBuf);
-
-    /** Decode a serialized agent map */
-    void importAgents(framing::Buffer& inBuf);
-
-    // these are in support of the managementSetup-state stuff, for synch'ing clustered brokers
-    uint64_t getNextObjectId(void) { return nextObjectId; }
-    void setNextObjectId(uint64_t o) { nextObjectId = o; }
-
     uint16_t getBootSequence(void) { return bootSequence; }
     void setBootSequence(uint16_t b) { bootSequence = b; writeData(); }
 
@@ -148,20 +126,11 @@ public:
 
     static types::Variant::Map toMap(const framing::FieldTable& from);
 
-    // For Clustering: management objects that have been marked as
-    // "deleted", but are waiting for their last published object
-    // update are not visible to the cluster replication code.  These
-    // interfaces allow clustering to gather up all the management
-    // objects that are deleted in order to allow all clustered
-    // brokers to publish the same set of deleted objects.
-
     class DeletedObject {
       public:
         typedef boost::shared_ptr<DeletedObject> shared_ptr;
-        DeletedObject(ManagementObject *, bool v1, bool v2);
-        DeletedObject( const std::string &encoded );
+        DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2);
         ~DeletedObject() {};
-        void encode( std::string& toBuffer );
         const std::string getKey() const {
             // used to batch up objects of the same class type
             return std::string(packageName + std::string(":") + className);
@@ -181,22 +150,7 @@ public:
 
     typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList;
 
-    /** returns a snapshot of all currently deleted management objects. */
-    void exportDeletedObjects( DeletedObjectList& outList );
-
-    /** Import a list of deleted objects to send on next publish interval. */
-    void importDeletedObjects( const DeletedObjectList& inList );
-
 private:
-    struct Periodic : public qpid::sys::TimerTask
-    {
-        ManagementAgent& agent;
-
-        Periodic (ManagementAgent& agent, uint32_t seconds);
-        virtual ~Periodic ();
-        void fire ();
-    };
-
     //  Storage for tracking remote management agents, attached via the client
     //  management agent API.
     //
@@ -207,9 +161,9 @@ private:
         uint32_t          agentBank;
         std::string       routingKey;
         ObjectId          connectionRef;
-        qmf::org::apache::qpid::broker::Agent*    mgmtObject;
-        RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {}
-        ManagementObject* GetManagementObject (void) const { return mgmtObject; }
+        qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject;
+        RemoteAgent(ManagementAgent& _agent) : agent(_agent) {}
+        ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; }
 
         virtual ~RemoteAgent ();
         void mapEncode(qpid::types::Variant::Map& _map) const;
@@ -276,7 +230,7 @@ private:
     PackageMap                   packages;
 
     //
-    // Protected by userLock
+    // Protected by objectLock
     //
     ManagementObjectMap          managementObjects;
 
@@ -288,11 +242,11 @@ private:
     framing::Uuid                uuid;
 
     //
-    // Lock hierarchy:  If a thread needs to take both addLock and userLock,
-    // it MUST take userLock first, then addLock.
+    // Lock ordering:  userLock -> addLock -> objectLock
     //
     sys::Mutex userLock;
     sys::Mutex addLock;
+    sys::Mutex objectLock;
 
     qpid::broker::Exchange::shared_ptr mExchange;
     qpid::broker::Exchange::shared_ptr dExchange;
@@ -335,53 +289,51 @@ private:
     // list of objects that have been deleted, but have yet to be published
     // one final time.
     // Indexed by a string composed of the object's package and class name.
-    // Protected by userLock.
+    // Protected by objectLock.
     typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap;
     PendingDeletedObjsMap pendingDeletedObjs;
 
-#   define MA_BUFFER_SIZE 65536
-    char inputBuffer[MA_BUFFER_SIZE];
-    char outputBuffer[MA_BUFFER_SIZE];
-    char eventBuffer[MA_BUFFER_SIZE];
-    framing::ResizableBuffer msgBuffer;
+    // Pollable queue to serialize event messages
+    typedef std::pair<boost::shared_ptr<broker::Exchange>,
+                      broker::Message> ExchangeAndMessage;
+    typedef sys::PollableQueue<ExchangeAndMessage> EventQueue;
 
     //
     // Memory statistics object
     //
-    qmf::org::apache::qpid::broker::Memory *memstat;
+    qmf::org::apache::qpid::broker::Memory::shared_ptr memstat;
 
     void writeData ();
     void periodicProcessing (void);
-    void deleteObjectNowLH(const ObjectId& oid);
+    void deleteObjectNow(const ObjectId& oid);
     void encodeHeader       (framing::Buffer& buf, uint8_t  opcode, uint32_t  seq = 0);
     bool checkHeader        (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
-    void sendBufferLH(framing::Buffer&             buf,
-                      uint32_t                     length,
-                      qpid::broker::Exchange::shared_ptr exchange,
-                      const std::string&           routingKey);
-    void sendBufferLH(framing::Buffer&             buf,
-                      uint32_t                     length,
-                      const std::string&           exchange,
-                      const std::string&           routingKey);
-    void sendBufferLH(const std::string&     data,
-                      const std::string&     cid,
-                      const qpid::types::Variant::Map& headers,
-                      const std::string&     content_type,
-                      qpid::broker::Exchange::shared_ptr exchange,
-                      const std::string& routingKey,
-                      uint64_t ttl_msec = 0);
-    void sendBufferLH(const std::string& data,
-                      const std::string& cid,
-                      const qpid::types::Variant::Map& headers,
-                      const std::string& content_type,
-                      const std::string& exchange,
-                      const std::string& routingKey,
-                      uint64_t ttl_msec = 0);
-    void moveNewObjectsLH();
-    bool moveDeletedObjectsLH();
+    EventQueue::Batch::const_iterator sendEvents(const EventQueue::Batch& batch);
+    void sendBuffer(framing::Buffer&             buf,
+                    qpid::broker::Exchange::shared_ptr exchange,
+                    const std::string&           routingKey);
+    void sendBuffer(framing::Buffer&             buf,
+                    const std::string&           exchange,
+                    const std::string&           routingKey);
+    void sendBuffer(const std::string&     data,
+                    const std::string&     cid,
+                    const qpid::types::Variant::Map& headers,
+                    const std::string&     content_type,
+                    qpid::broker::Exchange::shared_ptr exchange,
+                    const std::string& routingKey,
+                    uint64_t ttl_msec = 0);
+    void sendBuffer(const std::string& data,
+                    const std::string& cid,
+                    const qpid::types::Variant::Map& headers,
+                    const std::string& content_type,
+                    const std::string& exchange,
+                    const std::string& routingKey,
+                    uint64_t ttl_msec = 0);
+    void moveNewObjects();
+    bool moveDeletedObjects();
 
-    bool authorizeAgentMessageLH(qpid::broker::Message& msg);
-    void dispatchAgentCommandLH(qpid::broker::Message& msg, bool viaLocal=false);
+    bool authorizeAgentMessage(qpid::broker::Message& msg);
+    void dispatchAgentCommand(qpid::broker::Message& msg, bool viaLocal=false);
 
     PackageMap::iterator findOrAddPackageLH(std::string name);
     void addClassLH(uint8_t                      kind,
@@ -399,22 +351,22 @@ private:
     uint32_t allocateNewBank ();
     uint32_t assignBankLH (uint32_t requestedPrefix);
     void deleteOrphanedAgentsLH();
-    void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence,
-                              uint32_t code = 0, const std::string& text = "OK");
-    void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
-    void handleBrokerRequestLH  (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
-    void handlePackageQueryLH   (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
-    void handlePackageIndLH     (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
-    void handleClassQueryLH     (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
-    void handleClassIndLH       (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
-    void handleSchemaRequestLH  (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
-    void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
-    void handleAttachRequestLH  (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
-    void handleGetQueryLH       (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
-    void handleMethodRequestLH  (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
-    void handleGetQueryLH       (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
-    void handleMethodRequestLH  (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
-    void handleLocateRequestLH  (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
+    void sendCommandComplete(const std::string& replyToKey, uint32_t sequence,
+                             uint32_t code = 0, const std::string& text = "OK");
+    void sendException(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
+    void handleBrokerRequest  (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+    void handlePackageQuery   (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+    void handlePackageInd     (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+    void handleClassQuery     (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+    void handleClassInd       (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+    void handleSchemaRequest  (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
+    void handleSchemaResponse (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+    void handleAttachRequest  (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
+    void handleGetQuery       (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+    void handleMethodRequest  (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
+    void handleGetQuery       (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
+    void handleMethodRequest  (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
+    void handleLocateRequest  (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
 
 
     size_t validateSchema(framing::Buffer&, uint8_t kind);
@@ -424,6 +376,7 @@ private:
 
     std::string summarizeAgents();
     void debugSnapshot(const char* title);
+    std::auto_ptr<EventQueue> sendQueue;
 };
 
 void setManagementExecutionContext(const qpid::broker::ConnectionState*);

Propchange: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:r1375509-1450773

Modified: qpid/branches/asyncstore/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/messaging/Connection.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/messaging/Connection.cpp Thu Feb 28 16:14:30 2013
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@
 #include "qpid/messaging/Session.h"
 #include "qpid/messaging/SessionImpl.h"
 #include "qpid/messaging/PrivateImplRef.h"
+#include "qpid/messaging/ProtocolRegistry.h"
 #include "qpid/client/amqp0_10/ConnectionImpl.h"
 #include "qpid/log/Statement.h"
 
@@ -40,22 +41,32 @@ Connection& Connection::operator=(const 
 Connection::~Connection() { PI::dtor(*this); }
 
 Connection::Connection(const std::string& url, const std::string& o)
-{ 
+{
     Variant::Map options;
     AddressParser parser(o);
     if (o.empty() || parser.parseMap(options)) {
-        PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+        ConnectionImpl* impl = ProtocolRegistry::create(url, options);
+        if (impl) {
+            PI::ctor(*this, impl);
+        } else {
+            PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+        }
     } else {
         throw InvalidOptionString("Invalid option string: " + o);
     }
 }
 Connection::Connection(const std::string& url, const Variant::Map& options)
 {
-    PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+    ConnectionImpl* impl = ProtocolRegistry::create(url, options);
+    if (impl) {
+        PI::ctor(*this, impl);
+    } else {
+        PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+    }
 }
 
 Connection::Connection()
-{ 
+{
     Variant::Map options;
     std::string url = "amqp:tcp:127.0.0.1:5672";
     PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
@@ -67,12 +78,12 @@ bool Connection::isOpen() const { return
 void Connection::close() { impl->close(); }
 Session Connection::createSession(const std::string& name) { return impl->newSession(false, name); }
 Session Connection::createTransactionalSession(const std::string& name)
-{ 
+{
     return impl->newSession(true, name);
 }
 Session Connection::getSession(const std::string& name) const { return impl->getSession(name); }
 void Connection::setOption(const std::string& name, const Variant& value)
-{ 
+{
     impl->setOption(name, value);
 }
 std::string Connection::getAuthenticatedUsername()

Modified: qpid/branches/asyncstore/cpp/src/qpid/messaging/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/messaging/Message.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/messaging/Message.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/messaging/Message.cpp Thu Feb 28 16:14:30 2013
@@ -46,26 +46,26 @@ const std::string& Message::getSubject()
 void Message::setContentType(const std::string& s) { impl->setContentType(s); }
 const std::string& Message::getContentType() const { return impl->getContentType(); }
 
-void Message::setMessageId(const std::string& id) { impl->messageId = id; }
-const std::string& Message::getMessageId() const { return impl->messageId; }
+void Message::setMessageId(const std::string& id) { impl->setMessageId(id); }
+const std::string& Message::getMessageId() const { return impl->getMessageId(); }
 
-void Message::setUserId(const std::string& id) { impl->userId = id; }
-const std::string& Message::getUserId() const { return impl->userId; }
+void Message::setUserId(const std::string& id) { impl->setUserId(id); }
+const std::string& Message::getUserId() const { return impl->getUserId(); }
 
-void Message::setCorrelationId(const std::string& id) { impl->correlationId = id; }
-const std::string& Message::getCorrelationId() const { return impl->correlationId; }
+void Message::setCorrelationId(const std::string& id) { impl->setCorrelationId(id); }
+const std::string& Message::getCorrelationId() const { return impl->getCorrelationId(); }
 
-uint8_t Message::getPriority() const { return impl->priority; }
-void Message::setPriority(uint8_t priority) { impl->priority = priority; }
+uint8_t Message::getPriority() const { return impl->getPriority(); }
+void Message::setPriority(uint8_t priority) { impl->setPriority(priority); }
 
-void Message::setTtl(Duration ttl) { impl->ttl = ttl.getMilliseconds(); }
-Duration Message::getTtl() const { return Duration(impl->ttl); }
+void Message::setTtl(Duration ttl) { impl->setTtl(ttl.getMilliseconds()); }
+Duration Message::getTtl() const { return Duration(impl->getTtl()); }
 
-void Message::setDurable(bool durable) { impl->durable = durable; }
-bool Message::getDurable() const { return impl->durable; }
+void Message::setDurable(bool durable) { impl->setDurable(durable); }
+bool Message::getDurable() const { return impl->isDurable(); }
 
-bool Message::getRedelivered() const { return impl->redelivered; }
-void Message::setRedelivered(bool redelivered) { impl->redelivered = redelivered; }
+bool Message::getRedelivered() const { return impl->isRedelivered(); }
+void Message::setRedelivered(bool redelivered) { impl->setRedelivered(redelivered); }
 
 const Variant::Map& Message::getProperties() const { return impl->getHeaders(); }
 Variant::Map& Message::getProperties() { return impl->getHeaders(); }

Modified: qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.cpp Thu Feb 28 16:14:30 2013
@@ -45,28 +45,163 @@ MessageImpl::MessageImpl(const char* cha
     bytes(chars, count),
     internalId(0) {}
 
-void MessageImpl::setReplyTo(const Address& d) { replyTo = d; }
-const Address& MessageImpl::getReplyTo() const { return replyTo; }
+void MessageImpl::setReplyTo(const Address& d)
+{
+    replyTo = d;
+    updated();
+}
+const Address& MessageImpl::getReplyTo() const
+{
+    if (!replyTo && encoded) encoded->getReplyTo(replyTo);
+    return replyTo;
+}
 
-void MessageImpl::setSubject(const std::string& s) { subject = s; }
-const std::string& MessageImpl::getSubject() const { return subject; }
+void MessageImpl::setSubject(const std::string& s)
+{
+    subject = s;
+    updated();
+}
+const std::string& MessageImpl::getSubject() const
+{
+    if (!subject.size() && encoded) encoded->getSubject(subject);
+    return subject;
+}
 
-void MessageImpl::setContentType(const std::string& s) { contentType = s; }
-const std::string& MessageImpl::getContentType() const { return contentType; }
+void MessageImpl::setContentType(const std::string& s)
+{
+    contentType = s;
+    updated();
+}
+const std::string& MessageImpl::getContentType() const
+{
+    if (!contentType.size() && encoded) encoded->getContentType(contentType);
+    return contentType;
+}
 
-const Variant::Map& MessageImpl::getHeaders() const { return headers; }
-Variant::Map& MessageImpl::getHeaders() { return headers; }
-void MessageImpl::setHeader(const std::string& key, const qpid::types::Variant& val) { headers[key] = val; }
+void MessageImpl::setMessageId(const std::string& s)
+{
+    messageId = s;
+    updated();
+}
+const std::string& MessageImpl::getMessageId() const
+{
+    if (!messageId.size() && encoded) encoded->getMessageId(messageId);
+    return messageId;
+}
+void MessageImpl::setUserId(const std::string& s)
+{
+    userId = s;
+    updated();
+}
+const std::string& MessageImpl::getUserId() const
+{
+    if (!userId.size() && encoded) encoded->getUserId(userId);
+    return userId;
+}
+void MessageImpl::setCorrelationId(const std::string& s)
+{
+    correlationId = s;
+    updated();
+}
+const std::string& MessageImpl::getCorrelationId() const
+{
+    if (!correlationId.size() && encoded) encoded->getCorrelationId(correlationId);
+    return correlationId;
+}
+void MessageImpl::setPriority(uint8_t p)
+{
+    priority = p;
+}
+uint8_t MessageImpl::getPriority() const
+{
+    return priority;
+}
+void MessageImpl::setTtl(uint64_t t)
+{
+    ttl = t;
+}
+uint64_t MessageImpl::getTtl() const
+{
+    return ttl;
+}
+void MessageImpl::setDurable(bool d)
+{
+    durable = d;
+}
+bool MessageImpl::isDurable() const
+{
+    return durable;
+}
+void MessageImpl::setRedelivered(bool b)
+{
+    redelivered = b;
+}
+bool MessageImpl::isRedelivered() const
+{
+    return redelivered;
+}
+
+const Variant::Map& MessageImpl::getHeaders() const
+{
+    if (!headers.size() && encoded) encoded->populate(headers);
+    return headers;
+}
+Variant::Map& MessageImpl::getHeaders() {
+    if (!headers.size() && encoded) encoded->populate(headers);
+    updated();
+    return headers;
+}
+void MessageImpl::setHeader(const std::string& key, const qpid::types::Variant& val)
+{
+    headers[key] = val; updated();
+}
 
 //should these methods be on MessageContent?
-void MessageImpl::setBytes(const std::string& c) { bytes = c; }
-void MessageImpl::setBytes(const char* chars, size_t count) { bytes.assign(chars, count); }
-const std::string& MessageImpl::getBytes() const { return bytes; }
-std::string& MessageImpl::getBytes() { return bytes; }
+void MessageImpl::setBytes(const std::string& c)
+{
+    bytes = c;
+    updated();
+}
+void MessageImpl::setBytes(const char* chars, size_t count)
+{
+    bytes.assign(chars, count);
+    updated();
+}
+void MessageImpl::appendBytes(const char* chars, size_t count)
+{
+    bytes.append(chars, count);
+    updated();
+}
+const std::string& MessageImpl::getBytes() const
+{
+    if (!bytes.size() && encoded) encoded->getBody(bytes);
+    return bytes;
+}
+std::string& MessageImpl::getBytes()
+{
+    if (!bytes.size() && encoded) encoded->getBody(bytes);
+    updated();//have to assume body may be edited, invalidating our message
+    return bytes;
+}
 
 void MessageImpl::setInternalId(qpid::framing::SequenceNumber i) { internalId = i; }
 qpid::framing::SequenceNumber MessageImpl::getInternalId() { return internalId; }
 
+void MessageImpl::updated()
+{
+
+    if (!replyTo && encoded) encoded->getReplyTo(replyTo);
+    if (!subject.size() && encoded) encoded->getSubject(subject);
+    if (!contentType.size() && encoded) encoded->getContentType(contentType);
+    if (!messageId.size() && encoded) encoded->getMessageId(messageId);
+    if (!userId.size() && encoded) encoded->getUserId(userId);
+    if (!correlationId.size() && encoded) encoded->getCorrelationId(correlationId);
+    if (!headers.size() && encoded) encoded->populate(headers);
+    if (!bytes.size() && encoded) encoded->getBody(bytes);
+
+    encoded.reset();
+}
+
 MessageImpl& MessageImplAccess::get(Message& msg)
 {
     return *msg.impl;

Modified: qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/messaging/MessageImpl.h Thu Feb 28 16:14:30 2013
@@ -24,52 +24,77 @@
 #include "qpid/messaging/Address.h"
 #include "qpid/types/Variant.h"
 #include "qpid/framing/SequenceNumber.h"
+#include "qpid/messaging/amqp/EncodedMessage.h"
+#include <vector>
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
 namespace messaging {
 
-struct MessageImpl
+class MessageImpl
 {
-    Address replyTo;
-    std::string subject;
-    std::string contentType;
-    std::string messageId;
-    std::string userId;
-    std::string correlationId;
+  private:
+    mutable Address replyTo;
+    mutable std::string subject;
+    mutable std::string contentType;
+    mutable std::string messageId;
+    mutable std::string userId;
+    mutable std::string correlationId;
     uint8_t priority;
     uint64_t ttl;
     bool durable;
     bool redelivered;
-    qpid::types::Variant::Map headers;
+    mutable qpid::types::Variant::Map headers;
 
-    std::string bytes;
+    mutable std::string bytes;
+    boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> encoded;
 
     qpid::framing::SequenceNumber internalId;
 
+    void updated();
+  public:
     MessageImpl(const std::string& c);
     MessageImpl(const char* chars, size_t count);
 
     void setReplyTo(const Address& d);
     const Address& getReplyTo() const;
-    
+
     void setSubject(const std::string& s);
     const std::string& getSubject() const;
-    
+
     void setContentType(const std::string& s);
     const std::string& getContentType() const;
-    
+
+    void setMessageId(const std::string&);
+    const std::string& getMessageId() const;
+    void setUserId(const std::string& );
+    const std::string& getUserId() const;
+    void setCorrelationId(const std::string& );
+    const std::string& getCorrelationId() const;
+    void setPriority(uint8_t);
+    uint8_t getPriority() const;
+    void setTtl(uint64_t);
+    uint64_t getTtl() const;
+    void setDurable(bool);
+    bool isDurable() const;
+    void setRedelivered(bool);
+    bool isRedelivered() const;
+
+
     const qpid::types::Variant::Map& getHeaders() const;
     qpid::types::Variant::Map& getHeaders();
     void setHeader(const std::string& key, const qpid::types::Variant& val);
-    
+
     void setBytes(const std::string& bytes);
     void setBytes(const char* chars, size_t count);
+    void appendBytes(const char* chars, size_t count);
     const std::string& getBytes() const;
     std::string& getBytes();
 
     void setInternalId(qpid::framing::SequenceNumber id);
     qpid::framing::SequenceNumber getInternalId();
-
+    void setEncoded(boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> e) { encoded = e; }
+    boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> getEncoded() const { return encoded; }
 };
 
 class Message;

Modified: qpid/branches/asyncstore/cpp/src/qpid/messaging/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/messaging/ReceiverImpl.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/messaging/ReceiverImpl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/messaging/ReceiverImpl.h Thu Feb 28 16:14:30 2013
@@ -22,10 +22,12 @@
  *
  */
 #include "qpid/RefCounted.h"
+#include "qpid/sys/IntegerTypes.h"
 
 namespace qpid {
 namespace messaging {
 
+class Duration;
 class Message;
 class MessageListener;
 class Session;

Modified: qpid/branches/asyncstore/cpp/src/qpid/messaging/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/messaging/SenderImpl.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/messaging/SenderImpl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/messaging/SenderImpl.h Thu Feb 28 16:14:30 2013
@@ -22,6 +22,7 @@
  *
  */
 #include "qpid/RefCounted.h"
+#include "qpid/sys/IntegerTypes.h"
 
 namespace qpid {
 namespace messaging {

Modified: qpid/branches/asyncstore/cpp/src/qpid/store/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/CMakeLists.txt?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/CMakeLists.txt (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/CMakeLists.txt Thu Feb 28 16:14:30 2013
@@ -42,6 +42,7 @@ if (CMAKE_COMPILER_IS_GNUCXX)
 
   set_target_properties (store PROPERTIES
                          PREFIX ""
+                         COMPILE_DEFINITIONS _IN_QPID_BROKER
                          LINK_FLAGS "${GCC_CATCH_UNDEFINED}")
 endif (CMAKE_COMPILER_IS_GNUCXX)
 
@@ -54,7 +55,9 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
   endif (MSVC)
 endif (CMAKE_SYSTEM_NAME STREQUAL Windows)
 
-set_target_properties (store PROPERTIES VERSION ${qpidc_version})
+set_target_properties (store PROPERTIES
+                       COMPILE_DEFINITIONS _IN_QPID_BROKER
+                       VERSION ${qpidc_version})
 install (TARGETS store # RUNTIME
          DESTINATION ${QPIDD_MODULE_DIR}
          COMPONENT ${QPID_COMPONENT_BROKER})
@@ -81,6 +84,7 @@ if (BUILD_MSSQL)
                ms-sql/State.cpp
                ms-sql/TplRecordset.cpp
                ms-sql/VariantHelper.cpp)
+  set_target_properties (mssql_store PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
   target_link_libraries (mssql_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY})
   install (TARGETS mssql_store # RUNTIME
            DESTINATION ${QPIDD_MODULE_DIR}
@@ -110,6 +114,7 @@ if (BUILD_MSCLFS)
                ms-sql/State.cpp
                ms-sql/VariantHelper.cpp)
   include_directories(ms-sql)
+  set_target_properties (msclfs_store PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
   target_link_libraries (msclfs_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY} clfsw32.lib)
   install (TARGETS msclfs_store # RUNTIME
            DESTINATION ${QPIDD_MODULE_DIR}

Modified: qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp Thu Feb 28 16:14:30 2013
@@ -137,12 +137,6 @@ MessageStorePlugin::providerAvailable(co
         QPID_LOG(warning, "Storage provider " << name << " duplicate; ignored.");
 }
 
-void
-MessageStorePlugin::truncateInit(const bool /*saveStoreContent*/)
-{
-    QPID_LOG(info, "Store: truncateInit");
-}
-
 
 /**
  * Record the existence of a durable queue

Modified: qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.h Thu Feb 28 16:14:30 2013
@@ -24,18 +24,22 @@
 
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
-#include "qpid/broker/Broker.h"
 #include "qpid/broker/MessageStore.h"
-#include "qpid/broker/PersistableExchange.h"
-#include "qpid/broker/PersistableMessage.h"
-#include "qpid/broker/PersistableQueue.h"
-#include "qpid/management/Manageable.h"
+//#include "qpid/management/Manageable.h"
 
 #include <string>
 
 using namespace qpid;
 
 namespace qpid {
+
+namespace broker {
+class Broker;
+class PersistableExchange;
+class PersistableMessage;
+class PersistableQueue;
+}
+
 namespace store {
 
 class StorageProvider;
@@ -82,18 +86,6 @@ class MessageStorePlugin :
     /**
      * @name Methods inherited from qpid::broker::MessageStore
      */
-    //@{
-    /**
-     * If called before recovery, will discard the database and reinitialize
-     * using an empty store. This is used when cluster nodes recover and
-     * must get their content from a cluster sync rather than directly from
-     * the store.
-     *
-     * @param saveStoreContent    If true, the store's contents should be
-     *                            saved to a backup location before
-     *                            reinitializing the store content.
-     */
-    virtual void truncateInit(const bool saveStoreContent = false);
 
     /**
      * Record the existence of a durable queue

Modified: qpid/branches/asyncstore/cpp/src/qpid/store/StorageProvider.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/StorageProvider.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/StorageProvider.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/StorageProvider.h Thu Feb 28 16:14:30 2013
@@ -143,20 +143,6 @@ public:
     /**
      * @name Methods inherited from qpid::broker::MessageStore
      */
-    //@{
-    /**
-     * If called after init() but before recovery, will discard the database
-     * and reinitialize using an empty store dir. If @a pushDownStoreFiles
-     * is true, the content of the store dir will be moved to a backup dir
-     * inside the store dir. This is used when cluster nodes recover and must
-     * get thier content from a cluster sync rather than directly fromt the
-     * store.
-     *
-     * @param pushDownStoreFiles If true, will move content of the store dir
-     *                           into a subdir, leaving the store dir
-     *                           otherwise empty.
-     */
-    virtual void truncateInit(const bool pushDownStoreFiles = false) = 0;
 
     /**
      * Record the existence of a durable queue

Modified: qpid/branches/asyncstore/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp Thu Feb 28 16:14:30 2013
@@ -26,6 +26,7 @@
 #include <string>
 #include <windows.h>
 #include <clfsw32.h>
+#include <qpid/broker/Broker.h>
 #include <qpid/broker/RecoverableQueue.h>
 #include <qpid/log/Statement.h>
 #include <qpid/store/MessageStorePlugin.h>
@@ -108,20 +109,6 @@ public:
     /**
      * @name Methods inherited from qpid::broker::MessageStore
      */
-    //@{
-    /**
-     * If called after init() but before recovery, will discard the database
-     * and reinitialize using an empty store dir. If @a pushDownStoreFiles
-     * is true, the content of the store dir will be moved to a backup dir
-     * inside the store dir. This is used when cluster nodes recover and must
-     * get their content from a cluster sync rather than directly from the
-     * store.
-     *
-     * @param pushDownStoreFiles If true, will move content of the store dir
-     *                           into a subdir, leaving the store dir
-     *                           otherwise empty.
-     */
-    virtual void truncateInit(const bool pushDownStoreFiles = false);
 
     /**
      * Record the existence of a durable queue
@@ -467,11 +454,6 @@ MSSqlClfsProvider::activate(MessageStore
 }
 
 void
-MSSqlClfsProvider::truncateInit(const bool pushDownStoreFiles)
-{
-}
-
-void
 MSSqlClfsProvider::create(PersistableQueue& queue,
                           const qpid::framing::FieldTable& /*args needed for jrnl*/)
 {

Modified: qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp Thu Feb 28 16:14:30 2013
@@ -92,20 +92,6 @@ public:
     /**
      * @name Methods inherited from qpid::broker::MessageStore
      */
-    //@{
-    /**
-     * If called after init() but before recovery, will discard the database
-     * and reinitialize using an empty store dir. If @a pushDownStoreFiles
-     * is true, the content of the store dir will be moved to a backup dir
-     * inside the store dir. This is used when cluster nodes recover and must
-     * get thier content from a cluster sync rather than directly fromt the
-     * store.
-     *
-     * @param pushDownStoreFiles If true, will move content of the store dir
-     *                           into a subdir, leaving the store dir
-     *                           otherwise empty.
-     */
-    virtual void truncateInit(const bool pushDownStoreFiles = false);
 
     /**
      * Record the existence of a durable queue
@@ -392,11 +378,6 @@ MSSqlProvider::activate(MessageStorePlug
 }
 
 void
-MSSqlProvider::truncateInit(const bool pushDownStoreFiles)
-{
-}
-
-void
 MSSqlProvider::create(PersistableQueue& queue,
                       const qpid::framing::FieldTable& /*args needed for jrnl*/)
 {

Modified: qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp Thu Feb 28 16:14:30 2013
@@ -147,7 +147,7 @@ MessageRecordset::recover(qpid::broker::
 
         // Now, do we need the rest of the content?
         long contentLength = blobSize - headerFieldLength - headerSize;
-        if (msg->loadContent(contentLength)) {
+        if (contentLength > 0 && msg->loadContent(contentLength)) {
             BlobAdapter content(contentLength);
              content =
                 rs->Fields->Item["fieldTableBlob"]->GetChunk(contentLength);

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.cpp Thu Feb 28 16:14:30 2013
@@ -32,8 +32,6 @@ void AggregateOutput::abort() { control.
 
 void AggregateOutput::activateOutput() { control.activateOutput(); }
 
-void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); }
-
 namespace {
 // Clear the busy flag and notify waiting threads in destructor.
 struct ScopedBusy {
@@ -51,6 +49,7 @@ bool AggregateOutput::doOutput() {
     while (!tasks.empty()) {
         OutputTask* t=tasks.front();
         tasks.pop_front();
+        taskSet.erase(t);
         bool didOutput;
         {
             // Allow concurrent call to addOutputTask.
@@ -59,7 +58,9 @@ bool AggregateOutput::doOutput() {
             didOutput = t->doOutput();
         }
         if (didOutput) {
-            tasks.push_back(t);
+            if (taskSet.insert(t).second) {
+                tasks.push_back(t);
+            }
             return true;
         }
     }
@@ -68,12 +69,15 @@ bool AggregateOutput::doOutput() {
   
 void AggregateOutput::addOutputTask(OutputTask* task) {
     Mutex::ScopedLock l(lock);
-    tasks.push_back(task);
+    if (taskSet.insert(task).second) {
+        tasks.push_back(task);
+    }
 }
 
 void AggregateOutput::removeOutputTask(OutputTask* task) {
     Mutex::ScopedLock l(lock);
     while (busy) lock.wait();
+    taskSet.erase(task);
     tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end());
 }
   
@@ -81,6 +85,7 @@ void AggregateOutput::removeAll()
 {
     Mutex::ScopedLock l(lock);
     while (busy) lock.wait();
+    taskSet.clear();
     tasks.clear();
 }
   

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AggregateOutput.h Thu Feb 28 16:14:30 2013
@@ -28,6 +28,7 @@
 
 #include <algorithm>
 #include <deque>
+#include <set>
 
 namespace qpid {
 namespace sys {
@@ -44,9 +45,11 @@ namespace sys {
 class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public OutputControl
 {
     typedef std::deque<OutputTask*> TaskList;
+    typedef std::set<OutputTask*> TaskSet;
 
     Monitor lock;
     TaskList tasks;
+    TaskSet taskSet;
     bool busy;
     OutputControl& control;
 
@@ -56,7 +59,6 @@ class QPID_COMMON_CLASS_EXTERN Aggregate
     // These may be called concurrently with any function.
     QPID_COMMON_EXTERN void abort();
     QPID_COMMON_EXTERN void activateOutput();
-    QPID_COMMON_EXTERN void giveReadCredit(int32_t);
     QPID_COMMON_EXTERN void addOutputTask(OutputTask* t);
 
     // These functions must not be called concurrently with each other.

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h Thu Feb 28 16:14:30 2013
@@ -21,9 +21,11 @@
  *
  */
 
-#include "qpid/sys/IntegerTypes.h"
 #include "qpid/CommonImportExport.h"
 
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/SecuritySettings.h"
+
 #include <string.h>
 
 #include <boost/function.hpp>
@@ -56,6 +58,7 @@ class AsynchConnector {
 public:
     typedef boost::function1<void, const Socket&> ConnectedCallback;
     typedef boost::function3<void, const Socket&, int, const std::string&> FailedCallback;
+    typedef boost::function1<void, AsynchConnector&> RequestCallback;
 
     // Call create() to allocate a new AsynchConnector object with the
     // specified poller, addressing, and callbacks.
@@ -70,6 +73,7 @@ public:
                                    FailedCallback failCb);
     virtual void start(boost::shared_ptr<Poller> poller) = 0;
     virtual void stop() {};
+    virtual void requestCallback(RequestCallback) = 0;
 protected:
     AsynchConnector() {}
     virtual ~AsynchConnector() {}
@@ -155,11 +159,11 @@ public:
     virtual void notifyPendingWrite() = 0;
     virtual void queueWriteClose() = 0;
     virtual bool writeQueueEmpty() = 0;
-    virtual void startReading() = 0;
-    virtual void stopReading() = 0;
     virtual void requestCallback(RequestCallback) = 0;
     virtual BufferBase* getQueuedBuffer() = 0;
 
+    virtual SecuritySettings getSecuritySettings() = 0;
+
 protected:
     // Derived class manages lifetime; must be constructed using the
     // static create() method. Deletes not allowed from outside.

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp Thu Feb 28 16:14:30 2013
@@ -51,15 +51,15 @@ struct ProtocolTimeoutTask : public sys:
     }
 };
 
-AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) :
+AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool isClient0, bool nodict0) :
     identifier(id),
     aio(0),
     factory(f),
     codec(0),
     reads(0),
     readError(false),
-    isClient(false),
-    readCredit(InfiniteCredit)
+    isClient(isClient0),
+    nodict(nodict0)
 {}
 
 AsynchIOHandler::~AsynchIOHandler() {
@@ -97,25 +97,20 @@ void AsynchIOHandler::abort() {
     if (!readError) {
         aio->requestCallback(boost::bind(&AsynchIOHandler::eof, this, _1));
     }
+    aio->queueWriteClose();
 }
 
 void AsynchIOHandler::activateOutput() {
     aio->notifyPendingWrite();
 }
 
-// Input side
-void AsynchIOHandler::giveReadCredit(int32_t credit) {
-    // Check whether we started in the don't about credit state
-    if (readCredit.boolCompareAndSwap(InfiniteCredit, credit))
-        return;
-    // TODO In theory should be able to use an atomic operation before taking the lock
-    // but in practice there seems to be an unexplained race in that case
-    ScopedLock<Mutex> l(creditLock);
-    if (readCredit.fetchAndAdd(credit) != 0)
-        return;
-    assert(readCredit.get() >= 0);
-    if (readCredit.get() != 0)
-        aio->startReading();
+namespace {
+    SecuritySettings getSecuritySettings(AsynchIO* aio, bool nodict)
+    {
+        SecuritySettings settings = aio->getSecuritySettings();
+        settings.nodict = nodict;
+        return settings;
+    }
 }
 
 void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
@@ -123,26 +118,6 @@ void AsynchIOHandler::readbuff(AsynchIO&
         return;
     }
 
-    // Check here for read credit
-    if (readCredit.get() != InfiniteCredit) {
-        if (readCredit.get() == 0) {
-            // FIXME aconway 2009-10-01:  Workaround to avoid "false wakeups".
-            // readbuff is sometimes called with no credit.
-            // This should be fixed somewhere else to avoid such calls.
-            aio->unread(buff);
-            return;
-        }
-        // TODO In theory should be able to use an atomic operation before taking the lock
-        // but in practice there seems to be an unexplained race in that case
-        ScopedLock<Mutex> l(creditLock);
-        if (--readCredit == 0) {
-            assert(readCredit.get() >= 0);
-            if (readCredit.get() == 0) {
-                aio->stopReading();
-            }
-        }
-    }
-
     ++reads;
     size_t decoded = 0;
     if (codec) {                // Already initiated
@@ -168,13 +143,16 @@ void AsynchIOHandler::readbuff(AsynchIO&
 
             QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
             try {
-                codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
+                codec = factory->create(protocolInit.getVersion(), *this, identifier, getSecuritySettings(aio, nodict));
                 if (!codec) {
                     //TODO: may still want to revise this...
                     //send valid version header & close connection.
                     write(framing::ProtocolInitiation(framing::highestProtocolVersion));
                     readError = true;
                     aio->queueWriteClose();
+                } else {
+                    //read any further data that may already have been sent
+                    decoded += codec->decode(buff->bytes+buff->dataStart+in.getPosition(), buff->dataCount-in.getPosition());
                 }
             } catch (const std::exception& e) {
                 QPID_LOG(error, e.what());
@@ -223,7 +201,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&)
 
 void AsynchIOHandler::idle(AsynchIO&){
     if (isClient && codec == 0) {
-        codec = factory->create(*this, identifier, SecuritySettings());
+        codec = factory->create(*this, identifier, getSecuritySettings(aio, nodict));
         write(framing::ProtocolInitiation(codec->getVersion()));
         // We've just sent the protocol negotiation so we can cancel the timeout for that
         // This is not ideal, because we've not received anything yet, but heartbeats will

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h Thu Feb 28 16:14:30 2013
@@ -51,24 +51,19 @@ class AsynchIOHandler : public OutputCon
     uint32_t reads;
     bool readError;
     bool isClient;
-    AtomicValue<int32_t> readCredit;
-    static const int32_t InfiniteCredit = -1;
-    Mutex creditLock;
+    bool nodict;
     boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
 
     void write(const framing::ProtocolInitiation&);
 
   public:
-    QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f );
+    QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f, bool isClient, bool nodict);
     QPID_COMMON_EXTERN ~AsynchIOHandler();
     QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime);
 
-    QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
-
     // Output side
     QPID_COMMON_EXTERN void abort();
     QPID_COMMON_EXTERN void activateOutput();
-    QPID_COMMON_EXTERN void giveReadCredit(int32_t credit);
 
     // Input side
     QPID_COMMON_EXTERN void readbuff(AsynchIO& aio, AsynchIOBufferBase* buff);

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/Codec.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/Codec.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/Codec.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/Codec.h Thu Feb 28 16:14:30 2013
@@ -42,7 +42,7 @@ class Codec
 
 
     /** Encode into buffer, return number of bytes encoded */
-    virtual std::size_t encode(const char* buffer, std::size_t size) = 0;
+    virtual std::size_t encode(char* buffer, std::size_t size) = 0;
 
     /** Return true if we have data to encode */
     virtual bool canEncode() = 0;

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h Thu Feb 28 16:14:30 2013
@@ -45,7 +45,6 @@ class ConnectionOutputHandlerPtr : publi
     size_t getBuffered() const { return next->getBuffered(); }
     void abort() { next->abort(); }
     void activateOutput() { next->activateOutput(); }
-    void giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
     void send(framing::AMQFrame& f) { next->send(f); }
 
   private:

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/FileSysDir.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/FileSysDir.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/FileSysDir.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/FileSysDir.h Thu Feb 28 16:14:30 2013
@@ -54,6 +54,15 @@ class FileSysDir
 
     void mkdir(void);
 
+    typedef void Callback(const std::string&);
+
+    /**
+     * Call the Callback function for every regular file in the directory
+     *
+     * @param cb Callback function that receives the full path to the file
+     */
+    void forEachFile(Callback cb) const;
+
     std::string getPath   () { return dirPath; }
 };
  

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/OutputControl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/OutputControl.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/OutputControl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/OutputControl.h Thu Feb 28 16:14:30 2013
@@ -1,3 +1,6 @@
+#ifndef QPID_SYS_OUTPUT_CONTROL_H
+#define QPID_SYS_OUTPUT_CONTROL_H
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -21,9 +24,6 @@
 
 #include "qpid/sys/IntegerTypes.h"
 
-#ifndef _OutputControl_
-#define _OutputControl_
-
 namespace qpid {
 namespace sys {
 
@@ -33,11 +33,10 @@ namespace sys {
         virtual ~OutputControl() {}
         virtual void abort() = 0;
         virtual void activateOutput() = 0;
-        virtual void giveReadCredit(int32_t credit) = 0;
     };
 
 }
 }
 
 
-#endif
+#endif /*!QPID_SYS_OUTPUT_CONTROL_H*/

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ProtocolFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ProtocolFactory.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ProtocolFactory.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ProtocolFactory.h Thu Feb 28 16:14:30 2013
@@ -42,10 +42,10 @@ class ProtocolFactory : public qpid::Sha
     virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
     virtual void connect(
         boost::shared_ptr<Poller>,
+        const std::string& name,
         const std::string& host, const std::string& port,
         ConnectionCodec::Factory* codec,
         ConnectFailedCallback failed) = 0;
-    virtual bool supports(const std::string& /*capability*/) { return false; }
 };
 
 inline ProtocolFactory::~ProtocolFactory() {}

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/RdmaIOPlugin.cpp Thu Feb 28 16:14:30 2013
@@ -23,6 +23,7 @@
 
 #include "qpid/Plugin.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
 #include "qpid/framing/AMQP_HighestVersion.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/rdma/RdmaIO.h"
@@ -67,7 +68,6 @@ class RdmaIOHandler : public OutputContr
     void close();
     void abort();
     void activateOutput();
-    void giveReadCredit(int32_t credit);
     void initProtocolOut();
 
     // Input side
@@ -83,7 +83,7 @@ class RdmaIOHandler : public OutputContr
 };
 
 RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) :
-    identifier(c->getFullName()),
+    identifier(broker::QPID_NAME_PREFIX+c->getFullName()),
     factory(f),
     codec(0),
     readError(false),
@@ -199,10 +199,6 @@ void RdmaIOHandler::full(Rdma::AsynchIO&
     QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]");
 }
 
-// TODO: Dummy implementation of read throttling
-void RdmaIOHandler::giveReadCredit(int32_t) {
-}
-
 // The logic here is subtly different from TCP as RDMA is message oriented
 // so we define that an RDMA message is a frame - in this case there is no putting back
 // of any message remainder - there shouldn't be any. And what we read here can't be
@@ -250,7 +246,7 @@ class RdmaIOProtocolFactory : public Pro
   public:
     RdmaIOProtocolFactory(int16_t port, int backlog);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
-    void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
+    void connect(Poller::shared_ptr, const std::string& name, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
 
     uint16_t getPort() const;
 
@@ -371,6 +367,7 @@ void RdmaIOProtocolFactory::connected(Po
 
 void RdmaIOProtocolFactory::connect(
     Poller::shared_ptr poller,
+    const std::string& /*name*/,
     const std::string& host, const std::string& port,
     ConnectionCodec::Factory* f,
     ConnectFailedCallback failed)

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/SecurityLayer.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/SecurityLayer.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/SecurityLayer.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/SecurityLayer.h Thu Feb 28 16:14:30 2013
@@ -33,8 +33,12 @@ namespace sys {
 class SecurityLayer : public Codec
 {
   public:
+    SecurityLayer(int ssf_) : ssf(ssf_) {}
+    int getSsf() const { return ssf; }
     virtual void init(Codec*) = 0;
     virtual ~SecurityLayer() {}
+  private:
+    int ssf;
 };
 
 }} // namespace qpid::sys

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/SecuritySettings.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/SecuritySettings.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/SecuritySettings.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/SecuritySettings.h Thu Feb 28 16:14:30 2013
@@ -21,6 +21,8 @@
  * under the License.
  *
  */
+#include <string>
+
 namespace qpid {
 namespace sys {
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/Socket.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/Socket.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/Socket.h Thu Feb 28 16:14:30 2013
@@ -22,7 +22,6 @@
  *
  */
 
-#include "qpid/sys/IOHandle.h"
 #include "qpid/sys/IntegerTypes.h"
 #include "qpid/CommonImportExport.h"
 #include <string>
@@ -31,45 +30,43 @@ namespace qpid {
 namespace sys {
 
 class Duration;
+class IOHandle;
 class SocketAddress;
 
-class QPID_COMMON_CLASS_EXTERN Socket : public IOHandle
+class Socket
 {
 public:
-    /** Create a socket wrapper for descriptor. */
-    QPID_COMMON_EXTERN Socket();
+    virtual ~Socket() {};
 
-    /** Create a new Socket which is the same address family as this one */
-    QPID_COMMON_EXTERN Socket* createSameTypeSocket() const;
+    virtual operator const IOHandle&() const = 0;
 
     /** Set socket non blocking */
-    void setNonblocking() const;
+    virtual void setNonblocking() const = 0;
 
-    QPID_COMMON_EXTERN void setTcpNoDelay() const;
+    virtual void setTcpNoDelay() const = 0;
 
-    QPID_COMMON_EXTERN void connect(const std::string& host, const std::string& port) const;
-    QPID_COMMON_EXTERN void connect(const SocketAddress&) const;
+    virtual void connect(const SocketAddress&) const = 0;
+    virtual void finishConnect(const SocketAddress&) const = 0;
 
-    QPID_COMMON_EXTERN void close() const;
+    virtual void close() const = 0;
 
     /** Bind to a port and start listening.
      *@param port 0 means choose an available port.
      *@param backlog maximum number of pending connections.
      *@return The bound port.
      */
-    QPID_COMMON_EXTERN int listen(const std::string& host = "", const std::string& port = "0", int backlog = 10) const;
-    QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const;
+    virtual int listen(const SocketAddress&, int backlog = 10) const = 0;
 
     /**
      * Returns an address (host and port) for the remote end of the
      * socket
      */
-    QPID_COMMON_EXTERN std::string getPeerAddress() const;
+    virtual std::string getPeerAddress() const = 0;
     /**
      * Returns an address (host and port) for the local end of the
      * socket
      */
-    QPID_COMMON_EXTERN std::string getLocalAddress() const;
+    virtual std::string getLocalAddress() const = 0;
 
     /**
      * Returns the full address of the connection: local and remote host and port.
@@ -80,31 +77,24 @@ public:
      * Returns the error code stored in the socket.  This may be used
      * to determine the result of a non-blocking connect.
      */
-    QPID_COMMON_EXTERN int getError() const;
+    virtual int getError() const = 0;
 
     /** Accept a connection from a socket that is already listening
      * and has an incoming connection
      */
-    QPID_COMMON_EXTERN Socket* accept() const;
+    virtual Socket* accept() const = 0;
 
-    // TODO The following are raw operations, maybe they need better wrapping?
-    QPID_COMMON_EXTERN int read(void *buf, size_t count) const;
-    QPID_COMMON_EXTERN int write(const void *buf, size_t count) const;
-
-private:
-    /** Create socket */
-    void createSocket(const SocketAddress&) const;
+    virtual int read(void *buf, size_t count) const = 0;
+    virtual int write(const void *buf, size_t count) const = 0;
 
-public:
-    /** Construct socket with existing handle */
-    Socket(IOHandlePrivate*);
-
-protected:
-    mutable std::string localname;
-    mutable std::string peername;
-    mutable bool nonblocking;
-    mutable bool nodelay;
+    /* Transport security related: */
+    virtual int getKeyLen() const = 0;
+    virtual std::string getClientAuthId() const = 0;
 };
 
+/** Make the default socket for whatever platform we are executing on
+ */
+QPID_COMMON_EXTERN Socket* createSocket();
+
 }}
 #endif  /*!_sys_Socket_h*/

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/SocketAddress.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/SocketAddress.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/SocketAddress.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/SocketAddress.h Thu Feb 28 16:14:30 2013
@@ -44,11 +44,12 @@ public:
 
     QPID_COMMON_EXTERN bool nextAddress();
     QPID_COMMON_EXTERN std::string asString(bool numeric=true) const;
+    QPID_COMMON_EXTERN std::string getHost() const;
     QPID_COMMON_EXTERN void setAddrInfoPort(uint16_t port);
 
     QPID_COMMON_EXTERN static std::string asString(::sockaddr const * const addr, size_t addrlen);
     QPID_COMMON_EXTERN static uint16_t getPort(::sockaddr const * const addr);
-    
+
 
 private:
     std::string host;

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp Thu Feb 28 16:14:30 2013
@@ -22,19 +22,19 @@
 #include "qpid/sys/ProtocolFactory.h"
 
 #include "qpid/Plugin.h"
-#include "qpid/sys/ssl/check.h"
-#include "qpid/sys/ssl/util.h"
-#include "qpid/sys/ssl/SslHandler.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
+#include "qpid/log/Statement.h"
 #include "qpid/sys/AsynchIOHandler.h"
 #include "qpid/sys/AsynchIO.h"
-#include "qpid/sys/ssl/SslIo.h"
+#include "qpid/sys/ssl/util.h"
 #include "qpid/sys/ssl/SslSocket.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/log/Statement.h"
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/SystemInfo.h"
+#include "qpid/sys/Poller.h"
 
 #include <boost/bind.hpp>
-#include <memory>
-
+#include <boost/ptr_container/ptr_vector.hpp>
 
 namespace qpid {
 namespace sys {
@@ -64,38 +64,32 @@ struct SslServerOptions : ssl::SslOption
     }
 };
 
-template <class T>
-class SslProtocolFactoryTmpl : public ProtocolFactory {
-  private:
-
-    typedef SslAcceptorTmpl<T> SslAcceptor;
-
+class SslProtocolFactory : public ProtocolFactory {
+    boost::ptr_vector<Socket> listeners;
+    boost::ptr_vector<AsynchAcceptor> acceptors;
     Timer& brokerTimer;
     uint32_t maxNegotiateTime;
+    uint16_t listeningPort;
     const bool tcpNoDelay;
-    T listener;
-    const uint16_t listeningPort;
-    std::auto_ptr<SslAcceptor> acceptor;
     bool nodict;
 
   public:
-    SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay, Timer& timer, uint32_t maxTime);
+    SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
+                       Timer& timer);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
-    void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
+    void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
                  ConnectionCodec::Factory*,
-                 boost::function2<void, int, std::string> failed);
+                 ConnectFailedCallback);
 
     uint16_t getPort() const;
-    bool supports(const std::string& capability);
 
   private:
-    void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
-                     bool isClient);
+    void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
+    void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
+    void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
+    void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
 };
 
-typedef SslProtocolFactoryTmpl<SslSocket> SslProtocolFactory;
-typedef SslProtocolFactoryTmpl<SslMuxSocket> SslMuxProtocolFactory;
-
 
 // Static instance to initialise plugin
 static struct SslPlugin : public Plugin {
@@ -124,7 +118,7 @@ static struct SslPlugin : public Plugin 
             }
         }
     }
-    
+
     void initialize(Target& target) {
         QPID_LOG(trace, "Initialising SSL plugin");
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
@@ -139,19 +133,16 @@ static struct SslPlugin : public Plugin 
 
                     const broker::Broker::Options& opts = broker->getOptions();
 
-                    ProtocolFactory::shared_ptr protocol(options.multiplex ?
-                        static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options,
-                                                  opts.connectionBacklog,
-                                                  opts.tcpNoDelay,
-                                                  broker->getTimer(), opts.maxNegotiateTime)) :
-                        static_cast<ProtocolFactory*>(new SslProtocolFactory(options,
-                                               opts.connectionBacklog,
-                                               opts.tcpNoDelay,
-                                               broker->getTimer(), opts.maxNegotiateTime)));
-                    QPID_LOG(notice, "Listening for " <<
-                                     (options.multiplex ? "SSL or TCP" : "SSL") <<
-                                     " connections on TCP port " <<
-                                     protocol->getPort());
+                    ProtocolFactory::shared_ptr protocol(
+                        static_cast<ProtocolFactory*>(new SslProtocolFactory(opts, options, broker->getTimer())));
+
+                    if (protocol->getPort()!=0 ) {
+                        QPID_LOG(notice, "Listening for " <<
+                                        (options.multiplex ? "SSL or TCP" : "SSL") <<
+                                        " connections on TCP/TCP6 port " <<
+                                        protocol->getPort());
+                    }
+
                     broker->registerProtocolFactory("ssl", protocol);
                 } catch (const std::exception& e) {
                     QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what());
@@ -161,99 +152,133 @@ static struct SslPlugin : public Plugin 
     }
 } sslPlugin;
 
-template <class T>
-SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay, Timer& timer, uint32_t maxTime) :
+namespace {
+    // Expand list of Interfaces and addresses to a list of addresses
+    std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
+        std::vector<std::string> addresses;
+        // If there are no specific interfaces listed use a single "" to listen on every interface
+        if (interfaces.empty()) {
+            addresses.push_back("");
+            return addresses;
+        }
+        for (unsigned i = 0; i < interfaces.size(); ++i) {
+            const std::string& interface = interfaces[i];
+            if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
+                // We don't have an interface of that name -
+                // Check for IPv6 ('[' ']') brackets and remove them
+                // then pass to be looked up directly
+                if (interface[0]=='[' && interface[interface.size()-1]==']') {
+                    addresses.push_back(interface.substr(1, interface.size()-2));
+                } else {
+                    addresses.push_back(interface);
+                }
+            }
+        }
+        return addresses;
+    }
+}
+
+SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
+                                       Timer& timer) :
     brokerTimer(timer),
-    maxNegotiateTime(maxTime),
-    tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)),
+    maxNegotiateTime(opts.maxNegotiateTime),
+    tcpNoDelay(opts.tcpNoDelay),
     nodict(options.nodict)
-{}
-
-void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
-                    ConnectionCodec::Factory* f, bool isClient,
-                    Timer& timer, uint32_t maxTime, bool tcpNoDelay, bool nodict) {
-    qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
-
-    if (tcpNoDelay) {
-        s.setTcpNoDelay(tcpNoDelay);
-        QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+{
+    std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
+    if (addresses.empty()) {
+        // We specified some interfaces, but couldn't find addresses for them
+        QPID_LOG(warning, "SSL: No specified network interfaces found: Not Listening");
+        listeningPort = 0;
     }
 
-    if (isClient) {
-        async->setClient();
+    for (unsigned i = 0; i<addresses.size(); ++i) {
+        QPID_LOG(debug, "Using interface: " << addresses[i]);
+        SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
+
+        // We must have at least one resolved address
+        QPID_LOG(info, "Listening to: " << sa.asString())
+        Socket* s = options.multiplex ?
+            new SslMuxSocket(options.certName, options.clientAuth) :
+            new SslSocket(options.certName, options.clientAuth);
+        uint16_t lport = s->listen(sa, opts.connectionBacklog);
+        QPID_LOG(debug, "Listened to: " << lport);
+        listeners.push_back(s);
+
+        listeningPort = lport;
+
+        // Try any other resolved addresses
+        while (sa.nextAddress()) {
+            // Hack to ensure that all listening connections are on the same port
+            sa.setAddrInfoPort(listeningPort);
+            QPID_LOG(info, "Listening to: " << sa.asString())
+            Socket* s = options.multiplex ?
+                new SslMuxSocket(options.certName, options.clientAuth) :
+                new SslSocket(options.certName, options.clientAuth);
+            uint16_t lport = s->listen(sa, opts.connectionBacklog);
+            QPID_LOG(debug, "Listened to: " << lport);
+            listeners.push_back(s);
+        }
     }
-
-    qpid::sys::ssl::SslIO* aio = new qpid::sys::ssl::SslIO(s,
-                                 boost::bind(&qpid::sys::ssl::SslHandler::readbuff, async, _1, _2),
-                                 boost::bind(&qpid::sys::ssl::SslHandler::eof, async, _1),
-                                 boost::bind(&qpid::sys::ssl::SslHandler::disconnect, async, _1),
-                                 boost::bind(&qpid::sys::ssl::SslHandler::closedSocket, async, _1, _2),
-                                 boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1),
-                                 boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1));
-
-    async->init(aio,timer, maxTime);
-    aio->start(poller);
 }
 
-template <>
-void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
-                                     ConnectionCodec::Factory* f, bool isClient) {
-    const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
-
-    SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict);
+void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
+                                          ConnectionCodec::Factory* f) {
+    AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
+    establishedCommon(async, poller, s);
 }
 
-template <class T>
-uint16_t SslProtocolFactoryTmpl<T>::getPort() const {
-    return listeningPort; // Immutable no need for lock.
+void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
+                                             ConnectionCodec::Factory* f, const std::string& name) {
+    AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
+    establishedCommon(async, poller, s);
 }
 
-template <class T>
-void SslProtocolFactoryTmpl<T>::accept(Poller::shared_ptr poller,
-                                       ConnectionCodec::Factory* fact) {
-    acceptor.reset(
-        new SslAcceptor(listener,
-                        boost::bind(&SslProtocolFactoryTmpl<T>::established,
-                                    this, poller, _1, fact, false)));
-    acceptor->start(poller);
-}
-
-template <>
-void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
-                                        ConnectionCodec::Factory* f, bool isClient) {
-    const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
-
-    if (sslSock) {
-        SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict);
-        return;
-    }
-
-    AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f);
-
+void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
     if (tcpNoDelay) {
         s.setTcpNoDelay();
         QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
     }
 
-    if (isClient) {
-        async->setClient();
-    }
-    AsynchIO* aio = AsynchIO::create
-      (s,
-       boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
-       boost::bind(&AsynchIOHandler::eof, async, _1),
-       boost::bind(&AsynchIOHandler::disconnect, async, _1),
-       boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
-       boost::bind(&AsynchIOHandler::nobuffs, async, _1),
-       boost::bind(&AsynchIOHandler::idle, async, _1));
+    AsynchIO* aio = AsynchIO::create(
+        s,
+        boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+        boost::bind(&AsynchIOHandler::eof, async, _1),
+        boost::bind(&AsynchIOHandler::disconnect, async, _1),
+        boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+        boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+        boost::bind(&AsynchIOHandler::idle, async, _1));
 
     async->init(aio, brokerTimer, maxNegotiateTime);
     aio->start(poller);
 }
 
-template <class T>
-void SslProtocolFactoryTmpl<T>::connect(
+uint16_t SslProtocolFactory::getPort() const {
+    return listeningPort; // Immutable no need for lock.
+}
+
+void SslProtocolFactory::accept(Poller::shared_ptr poller,
+                                ConnectionCodec::Factory* fact) {
+    for (unsigned i = 0; i<listeners.size(); ++i) {
+        acceptors.push_back(
+            AsynchAcceptor::create(listeners[i],
+                            boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
+        acceptors[i].start(poller);
+    }
+}
+
+void SslProtocolFactory::connectFailed(
+    const Socket& s, int ec, const std::string& emsg,
+    ConnectFailedCallback failedCb)
+{
+    failedCb(ec, emsg);
+    s.close();
+    delete &s;
+}
+
+void SslProtocolFactory::connect(
     Poller::shared_ptr poller,
+    const std::string& name,
     const std::string& host, const std::string& port,
     ConnectionCodec::Factory* fact,
     ConnectFailedCallback failed)
@@ -264,31 +289,23 @@ void SslProtocolFactoryTmpl<T>::connect(
     // shutdown.  The allocated SslConnector frees itself when it
     // is no longer needed.
 
-    qpid::sys::ssl::SslSocket* socket = new qpid::sys::ssl::SslSocket();
-    new SslConnector(*socket, poller, host, port,
-                     boost::bind(&SslProtocolFactoryTmpl<T>::established, this, poller, _1, fact, true),
-                     failed);
-}
-
-namespace
-{
-const std::string SSL = "ssl";
-}
-
-template <>
-bool SslProtocolFactory::supports(const std::string& capability)
-{
-    std::string s = capability;
-    transform(s.begin(), s.end(), s.begin(), tolower);
-    return s == SSL;
-}
-
-template <>
-bool SslMuxProtocolFactory::supports(const std::string& capability)
-{
-    std::string s = capability;
-    transform(s.begin(), s.end(), s.begin(), tolower);
-    return s == SSL || s == "tcp";
+    Socket* socket = new qpid::sys::ssl::SslSocket();
+    try {
+    AsynchConnector* c = AsynchConnector::create(
+        *socket,
+        host,
+        port,
+        boost::bind(&SslProtocolFactory::establishedOutgoing,
+                    this, poller, _1, fact, name),
+        boost::bind(&SslProtocolFactory::connectFailed,
+                    this, _1, _2, _3, failed));
+    c->start(poller);
+    } catch (std::exception&) {
+        // TODO: Design question - should we do the error callback and also throw?
+        int errCode = socket->getError();
+        connectFailed(*socket, errCode, strError(errCode), failed);
+        throw;
+    }
 }
 
 }} // namespace qpid::sys



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org