You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC

svn commit: r1187375 [13/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.h Fri Oct 21 14:42:12 2011
@@ -74,11 +74,9 @@ class UpdateDataExchange : public broker
     void updateManagementAgent(management::ManagementAgent* agent);
 
   private:
-    MemberId clusterId;
     std::string managementAgents;
     std::string managementSchemas;
     std::string managementDeletedObjects;
-  friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&);
 };
 
 }} // namespace qpid::cluster

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateExchange.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateExchange.cpp Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,7 @@
  *
  */
 #include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/FieldTable.h"
 #include "qpid/broker/Message.h"
 #include "UpdateExchange.h"
 
@@ -27,6 +28,8 @@ namespace cluster {
 
 using framing::MessageTransferBody;
 using framing::DeliveryProperties;
+using framing::MessageProperties;
+using framing::FieldTable;
 
 UpdateExchange::UpdateExchange(management::Manageable* parent)
     : broker::Exchange(UpdateClient::UPDATE, parent),
@@ -34,6 +37,7 @@ UpdateExchange::UpdateExchange(managemen
 
 
 void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) {
+    // Copy exchange name to destination property.
     MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>();
     assert(transfer);
     const DeliveryProperties* props = msg->getProperties<DeliveryProperties>();
@@ -42,6 +46,23 @@ void UpdateExchange::setProperties(const
         transfer->setDestination(props->getExchange());
     else
         transfer->clearDestinationFlag();
-}
 
+    // Copy expiration from x-property if present.
+    if (msg->hasProperties<MessageProperties>()) {
+        const MessageProperties* mprops = msg->getProperties<MessageProperties>();
+        if (mprops->hasApplicationHeaders()) {
+            const FieldTable& headers = mprops->getApplicationHeaders();
+            if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) {
+                msg->setExpiration(
+                    sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION)));
+                msg->removeCustomProperty(UpdateClient::X_QPID_EXPIRATION);
+                // Erase props/headers that were added by the UpdateClient
+                if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS))
+                    msg->eraseProperties<MessageProperties>();
+                else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS))
+                    msg->clearApplicationHeadersFlag();
+            }
+        }
+    }
+}
 }} // namespace qpid::cluster

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateReceiver.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateReceiver.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateReceiver.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateReceiver.h Fri Oct 21 14:42:12 2011
@@ -39,6 +39,20 @@ class UpdateReceiver {
 
     /** Management-id for the next shadow connection */
     std::string nextShadowMgmtId;
+
+    /** Record the position of a DtxBuffer in the DtxManager (xid + index)
+     * and the association with a session, either suspended or current.
+     */
+    struct DtxBufferRef {
+        std::string xid;
+        uint32_t index;         // Index in WorkRecord in DtxManager
+        bool suspended;         // Is this a suspended or current transaction?
+        broker::SemanticState* semanticState; // Associated session
+        DtxBufferRef(const std::string& x, uint32_t i, bool s, broker::SemanticState* ss)
+            : xid(x), index(i), suspended(s), semanticState(ss) {}
+    };
+    typedef std::vector<DtxBufferRef> DtxBuffers;
+    DtxBuffers dtxBuffers;
 };
 }} // namespace qpid::cluster
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/types.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/types.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/types.h Fri Oct 21 14:42:12 2011
@@ -24,6 +24,7 @@
 
 #include "config.h"
 #include "qpid/Url.h"
+#include "qpid/RefCounted.h"
 #include "qpid/sys/IntegerTypes.h"
 #include <boost/intrusive_ptr.hpp>
 #include <utility>

Modified: qpid/branches/QPID-2519/cpp/src/qpid/console/SessionManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/console/SessionManager.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/console/SessionManager.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/console/SessionManager.cpp Fri Oct 21 14:42:12 2011
@@ -362,12 +362,11 @@ void SessionManager::handleCommandComple
 
 void SessionManager::handleClassInd(Broker* broker, Buffer& inBuffer, uint32_t)
 {
-    uint8_t kind;
     string packageName;
     string className;
     uint8_t hash[16];
 
-    kind = inBuffer.getOctet();
+    /*kind*/ (void) inBuffer.getOctet();
     inBuffer.getShortString(packageName);
     inBuffer.getShortString(className);
     inBuffer.getBin128(hash);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQBody.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQBody.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQBody.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQBody.h Fri Oct 21 14:42:12 2011
@@ -46,7 +46,7 @@ struct AMQBodyConstVisitor {
     virtual void visit(const AMQMethodBody&) = 0;
 };
 
-class AMQBody : public RefCounted {
+class QPID_COMMON_CLASS_EXTERN AMQBody : public RefCounted {
   public:
     AMQBody() {}
     QPID_COMMON_EXTERN virtual ~AMQBody();

Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQContentBody.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQContentBody.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQContentBody.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQContentBody.h Fri Oct 21 14:42:12 2011
@@ -29,7 +29,7 @@
 namespace qpid {
 namespace framing {
 
-class AMQContentBody :  public AMQBody
+class QPID_COMMON_CLASS_EXTERN AMQContentBody :  public AMQBody
 {
     string data;
 
@@ -37,15 +37,15 @@ public:
     QPID_COMMON_EXTERN AMQContentBody();
     QPID_COMMON_EXTERN AMQContentBody(const string& data);
     inline virtual ~AMQContentBody(){}
-    QPID_COMMON_EXTERN inline uint8_t type() const { return CONTENT_BODY; };
-    QPID_COMMON_EXTERN inline const string& getData() const { return data; }
-    QPID_COMMON_EXTERN inline string& getData() { return data; }
+    inline uint8_t type() const { return CONTENT_BODY; };
+    inline const string& getData() const { return data; }
+    inline string& getData() { return data; }
     QPID_COMMON_EXTERN uint32_t encodedSize() const;
     QPID_COMMON_EXTERN void encode(Buffer& buffer) const;
     QPID_COMMON_EXTERN void decode(Buffer& buffer, uint32_t size);
     QPID_COMMON_EXTERN void print(std::ostream& out) const;
-    QPID_COMMON_EXTERN void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
-    QPID_COMMON_EXTERN boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); }
+    void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
+    boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); }
 };
 
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.cpp Fri Oct 21 14:42:12 2011
@@ -139,6 +139,11 @@ bool AMQFrame::decode(Buffer& buffer)
     return true;
 }
 
+void AMQFrame::cloneBody()
+{
+    body = body->clone();
+}
+
 std::ostream& operator<<(std::ostream& out, const AMQFrame& f)
 {
     return

Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.h Fri Oct 21 14:42:12 2011
@@ -33,7 +33,7 @@
 namespace qpid {
 namespace framing {
 
-class AMQFrame : public AMQDataBlock
+class QPID_COMMON_CLASS_EXTERN AMQFrame : public AMQDataBlock
 {
   public:
     QPID_COMMON_EXTERN AMQFrame(const boost::intrusive_ptr<AMQBody>& b=0);
@@ -59,6 +59,11 @@ class AMQFrame : public AMQDataBlock
         return boost::polymorphic_downcast<const T*>(getBody());
     }
 
+    /**
+     * Take a deep copy of the body currently referenced
+     */
+    QPID_COMMON_EXTERN void cloneBody();
+
     QPID_COMMON_EXTERN void encode(Buffer& buffer) const; 
     QPID_COMMON_EXTERN bool decode(Buffer& buffer); 
     QPID_COMMON_EXTERN uint32_t encodedSize() const;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeaderBody.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeaderBody.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeaderBody.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeaderBody.h Fri Oct 21 14:42:12 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,7 +35,7 @@
 namespace qpid {
 namespace framing {
 
-class AMQHeaderBody :  public AMQBody
+class QPID_COMMON_CLASS_EXTERN AMQHeaderBody :  public AMQBody
 {
     template <class T> struct OptProps { boost::optional<T> props; };
     template <class Base, class T>
@@ -58,7 +58,7 @@ class AMQHeaderBody :  public AMQBody
         }
             else
                 return Base::decode(buffer, size, type);
-        }        
+        }
         void print(std::ostream& out) const {
             const boost::optional<T>& p=this->OptProps<T>::props;
             if (p) out << *p;
@@ -77,7 +77,7 @@ class AMQHeaderBody :  public AMQBody
     typedef  PropSet<PropSet<Empty, DeliveryProperties>, MessageProperties> Properties;
 
     Properties properties;
-    
+
 public:
 
     inline uint8_t type() const { return HEADER_BODY; }
@@ -99,6 +99,10 @@ public:
         return properties.OptProps<T>::props.get_ptr();
     }
 
+    template <class T> void erase() {
+        properties.OptProps<T>::props.reset();
+    }
+
     boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); }
 };
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeartbeatBody.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeartbeatBody.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeartbeatBody.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeartbeatBody.h Fri Oct 21 14:42:12 2011
@@ -29,7 +29,7 @@
 namespace qpid {
 namespace framing {
 
-class AMQHeartbeatBody :  public AMQBody
+class QPID_COMMON_CLASS_EXTERN AMQHeartbeatBody :  public AMQBody
 {
 public:
     QPID_COMMON_EXTERN virtual ~AMQHeartbeatBody();

Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/FieldTable.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/FieldTable.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/FieldTable.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/FieldTable.cpp Fri Oct 21 14:42:12 2011
@@ -129,7 +129,7 @@ FieldTable::ValuePtr FieldTable::get(con
 namespace {
     template <class T> T default_value() { return T(); }
     template <> int default_value<int>() { return 0; }
-    template <> uint64_t default_value<uint64_t>() { return 0; }
+  //template <> uint64_t default_value<uint64_t>() { return 0; }
 }
 
 template <class T>
@@ -198,10 +198,12 @@ void FieldTable::encode(Buffer& buffer) 
 
 void FieldTable::decode(Buffer& buffer){
     clear();
+    if (buffer.available() < 4)
+        throw IllegalArgumentException(QPID_MSG("Not enough data for field table."));
     uint32_t len = buffer.getLong();
     if (len) {
         uint32_t available = buffer.available();
-        if (available < len)
+        if ((available < len) || (available < 4))
             throw IllegalArgumentException(QPID_MSG("Not enough data for field table."));
         uint32_t count = buffer.getLong();
         uint32_t leftover = available - len;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/List.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/List.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/List.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/List.cpp Fri Oct 21 14:42:12 2011
@@ -49,6 +49,9 @@ void List::encode(Buffer& buffer) const
 void List::decode(Buffer& buffer)
 {
     values.clear();
+    if (buffer.available() < 4)
+        throw IllegalArgumentException(QPID_MSG("Not enough data for list, expected at least "
+                                                " 4 bytes but only " << buffer.available() << " available"));
     uint32_t size = buffer.getLong();
     uint32_t available = buffer.available();
     if (available < size) {
@@ -56,6 +59,9 @@ void List::decode(Buffer& buffer)
                                                 << size << " bytes but only " << available << " available"));
     }
     if (size) {
+        if (buffer.available() < 4)
+            throw IllegalArgumentException(QPID_MSG("Not enough data for list, expected at least "
+                                                    " 4 bytes but only " << buffer.available() << " available"));
         uint32_t count = buffer.getLong();        
         for (uint32_t i = 0; i < count; i++) {
             ValuePtr value(new FieldValue);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/MethodBodyFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/MethodBodyFactory.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/MethodBodyFactory.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/MethodBodyFactory.h Fri Oct 21 14:42:12 2011
@@ -22,6 +22,7 @@
  *
  */
 #include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQBody.h"
 #include <boost/intrusive_ptr.hpp>
 
 namespace qpid {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/SendContent.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/SendContent.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/SendContent.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/SendContent.h Fri Oct 21 14:42:12 2011
@@ -37,7 +37,7 @@ namespace framing {
  */
 class SendContent
 {
-    mutable FrameHandler& handler;
+    FrameHandler& handler;
     const uint16_t maxFrameSize;
     uint expectedFrameCount;
     uint frameCount;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/TransferContent.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/TransferContent.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/TransferContent.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/TransferContent.h Fri Oct 21 14:42:12 2011
@@ -32,7 +32,7 @@ namespace qpid {
 namespace framing {
 
 /** Message content */
-class TransferContent : public MethodContent
+class QPID_COMMON_CLASS_EXTERN TransferContent : public MethodContent
 {
     AMQHeaderBody header;
     std::string data;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/Uuid.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/Uuid.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/Uuid.cpp Fri Oct 21 14:42:12 2011
@@ -59,7 +59,9 @@ void Uuid::clear() {
 
 // Force int 0/!0 to false/true; avoids compile warnings.
 bool Uuid::isNull() const {
-    return !!uuid_is_null(data());
+    // This const cast is for Solaris which has a 
+    // uuid_is_null that takes a non const argument
+    return !!uuid_is_null(const_cast<uint8_t*>(data()));
 }
 
 void Uuid::encode(Buffer& buf) const {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/Logger.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/Logger.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/Logger.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/Logger.cpp Fri Oct 21 14:42:12 2011
@@ -22,6 +22,7 @@
 #include "qpid/memory.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Time.h"
+#include "qpid/DisableExceptionLogging.h"
 #include <boost/pool/detail/singleton.hpp>
 #include <boost/bind.hpp>
 #include <boost/function.hpp>
@@ -48,11 +49,16 @@ Logger& Logger::instance() {
 }
 
 Logger::Logger() : flags(0) {
+    // Disable automatic logging in Exception constructors to avoid
+    // re-entrant use of logger singleton if there is an error in
+    // option parsing.
+    DisableExceptionLogging del;
+
     // Initialize myself from env variables so all programs
     // (e.g. tests) can use logging even if they don't parse
     // command line args.
     Options opts("");
-    opts.parse(0, 0);           
+    opts.parse(0, 0);
     configure(opts);
 }
 
@@ -73,8 +79,12 @@ void Logger::log(const Statement& s, con
     std::ostringstream os;
     if (!prefix.empty())
         os << prefix << ": ";
-    if (flags&TIME) 
-		qpid::sys::outputFormattedNow(os);
+    if (flags&TIME) {
+        if (flags&HIRES)
+            qpid::sys::outputHiresNow(os);
+        else
+		    qpid::sys::outputFormattedNow(os);
+    }
     if (flags&LEVEL)
         os << LevelTraits::name(s.level) << " ";
     if (flags&THREAD)
@@ -123,7 +133,8 @@ int Logger::format(const Options& opts) 
         bitIf(opts.time, TIME) |
         bitIf(opts.source, (FILE|LINE)) |
         bitIf(opts.function, FUNCTION) |
-        bitIf(opts.thread, THREAD);
+        bitIf(opts.thread, THREAD) |
+        bitIf(opts.hiresTs, HIRES);
     format(flags);
     return flags;
 }
@@ -140,7 +151,7 @@ void Logger::configure(const Options& op
     Options o(opts);
     if (o.trace)
         o.selectors.push_back("trace+");
-    format(o); 
+    format(o);
     select(Selector(o));
     setPrefix(opts.prefix);
     options.sinkOptions->setup(this);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/Options.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/Options.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/Options.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/Options.cpp Fri Oct 21 14:42:12 2011
@@ -38,6 +38,7 @@ Options::Options(const std::string& argv
     thread(false),
     source(false),
     function(false),
+    hiresTs(false),
     trace(false),
     sinkOptions (SinkOptions::create(argv0_))
 {
@@ -65,6 +66,7 @@ Options::Options(const std::string& argv
         ("log-source", optValue(source,"yes|no"), "Include source file:line in log messages")
         ("log-thread", optValue(thread,"yes|no"), "Include thread ID in log messages")
         ("log-function", optValue(function,"yes|no"), "Include function signature in log messages")
+        ("log-hires-timestamp", optValue(hiresTs,"yes|no"), "Use unformatted hi-res timestamp in log messages")
         ("log-prefix", optValue(prefix,"STRING"), "Prefix to append to all log messages")
         ;
     add(*sinkOptions);
@@ -80,6 +82,7 @@ Options::Options(const Options &o) :
     thread(o.thread),
     source(o.source),
     function(o.function),
+    hiresTs(o.hiresTs),
     trace(o.trace),
     prefix(o.prefix),
     sinkOptions (SinkOptions::create(o.argv0))
@@ -97,6 +100,7 @@ Options& Options::operator=(const Option
         thread = x.thread;
         source = x.source;
         function = x.function;
+        hiresTs = x.hiresTs;
         trace = x.trace;
         prefix = x.prefix;
         *sinkOptions = *x.sinkOptions;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/Statement.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/Statement.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/Statement.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/Statement.cpp Fri Oct 21 14:42:12 2011
@@ -27,8 +27,6 @@ namespace qpid {
 namespace log {
 
 namespace {
-using namespace std;
-
 struct NonPrint { bool operator()(unsigned char c) { return !isprint(c) && !isspace(c); } };
 
 const char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
@@ -39,7 +37,7 @@ std::string quote(const std::string& str
     if (n==0) return str;
     std::string ret;
     ret.reserve(str.size()+2*n); // Avoid extra allocations.
-    for (string::const_iterator i = str.begin(); i != str.end(); ++i) {
+    for (std::string::const_iterator i = str.begin(); i != str.end(); ++i) {
         if (nonPrint(*i)) {
             ret.push_back('\\');
             ret.push_back('x');
@@ -50,7 +48,6 @@ std::string quote(const std::string& str
     }
     return ret;
 }
-
 }
 
 void Statement::log(const std::string& message) {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/posix/SinkOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/posix/SinkOptions.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/posix/SinkOptions.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/posix/SinkOptions.cpp Fri Oct 21 14:42:12 2011
@@ -180,7 +180,7 @@ qpid::log::SinkOptions& SinkOptions::ope
 }
 
 void SinkOptions::detached(void) {
-    if (logToStderr && !logToStdout && !logToSyslog) {
+    if (logToStderr && !logToStdout && !logToSyslog && logFile.empty()) {
         logToStderr = false;
         logToSyslog = true;
     }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.cpp Fri Oct 21 14:42:12 2011
@@ -53,7 +53,7 @@ static int eventTypes[qpid::log::LevelTr
 
 class EventLogOutput : public qpid::log::Logger::Output {
 public:
-    EventLogOutput(const std::string& sourceName) : logHandle(0)
+    EventLogOutput(const std::string& /*sourceName*/) : logHandle(0)
     {
         logHandle = OpenEventLog(0, "Application");
     }
@@ -83,7 +83,7 @@ private:
     HANDLE logHandle;
 };
 
-SinkOptions::SinkOptions(const std::string& argv0)
+SinkOptions::SinkOptions(const std::string& /*argv0*/)
     : qpid::log::SinkOptions(),
       logToStderr(true),
       logToStdout(false),

Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.h Fri Oct 21 14:42:12 2011
@@ -26,7 +26,7 @@ namespace qpid {
 namespace log {
 namespace windows {
 
-struct SinkOptions : public qpid::log::SinkOptions {
+struct QPID_COMMON_CLASS_EXTERN SinkOptions : public qpid::log::SinkOptions {
     QPID_COMMON_EXTERN SinkOptions(const std::string& argv0);
     virtual ~SinkOptions() {}
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp Fri Oct 21 14:42:12 2011
@@ -31,6 +31,7 @@
 #include <qpid/broker/Message.h>
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/sys/Time.h"
+#include "qpid/sys/Thread.h"
 #include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/AclModule.h"
 #include "qpid/types/Variant.h"
@@ -74,6 +75,18 @@ namespace {
         }
         return n2;
     }
+
+struct ScopedManagementContext
+{
+    ScopedManagementContext(const qpid::broker::ConnectionState* context)
+    {
+        setManagementExecutionContext(context);
+    }
+    ~ScopedManagementContext()
+    {
+        setManagementExecutionContext(0);
+    }
+};
 }
 
 
@@ -535,6 +548,7 @@ void ManagementAgent::sendBufferLH(Buffe
     dp->setRoutingKey(routingKey);
 
     msg->getFrames().append(content);
+    msg->setIsManagementMessage(true);
 
     {
         sys::Mutex::ScopedUnlock u(userLock);
@@ -600,7 +614,7 @@ void ManagementAgent::sendBufferLH(const
     props->setAppId("qmf2");
 
     for (i = headers.begin(); i != headers.end(); ++i) {
-        msg->getOrInsertHeaders().setString(i->first, i->second.asString());
+        msg->insertCustomProperty(i->first, i->second.asString());
     }
 
     DeliveryProperties* dp =
@@ -608,9 +622,10 @@ void ManagementAgent::sendBufferLH(const
     dp->setRoutingKey(routingKey);
     if (ttl_msec) {
         dp->setTtl(ttl_msec);
-        msg->setTimestamp(broker->getExpiryPolicy());
+        msg->computeExpiration(broker->getExpiryPolicy());
     }
     msg->getFrames().append(content);
+    msg->setIsManagementMessage(true);
 
     {
         sys::Mutex::ScopedUnlock u(userLock);
@@ -2237,6 +2252,7 @@ void ManagementAgent::dispatchAgentComma
     uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
+    ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher());
     const framing::FieldTable *headers = msg.getApplicationHeaders();
     if (headers && msg.getAppId() == "qmf2")
     {
@@ -2740,200 +2756,14 @@ void ManagementAgent::debugSnapshot(cons
                 title << ": new objects" << dumpVector(newManagementObjects));
 }
 
+
 Variant::Map ManagementAgent::toMap(const FieldTable& from)
 {
     Variant::Map map;
-
-    for (FieldTable::const_iterator iter = from.begin(); iter != from.end(); iter++) {
-        const string& key(iter->first);
-        const FieldTable::ValuePtr& val(iter->second);
-
-        map[key] = toVariant(val);
-    }
-
+    qpid::amqp_0_10::translate(from, map);
     return map;
 }
 
-Variant::List ManagementAgent::toList(const List& from)
-{
-    Variant::List _list;
-
-    for (List::const_iterator iter = from.begin(); iter != from.end(); iter++) {
-        const List::ValuePtr& val(*iter);
-
-        _list.push_back(toVariant(val));
-    }
-
-    return _list;
-}
-
-qpid::framing::FieldTable ManagementAgent::fromMap(const Variant::Map& from)
-{
-    qpid::framing::FieldTable ft;
-
-    for (Variant::Map::const_iterator iter = from.begin();
-         iter != from.end();
-         iter++) {
-        const string& key(iter->first);
-        const Variant& val(iter->second);
-
-        ft.set(key, toFieldValue(val));
-    }
-
-    return ft;
-}
-
-
-List ManagementAgent::fromList(const Variant::List& from)
-{
-    List fa;
-
-    for (Variant::List::const_iterator iter = from.begin();
-         iter != from.end();
-         iter++) {
-        const Variant& val(*iter);
-
-        fa.push_back(toFieldValue(val));
-    }
-
-    return fa;
-}
-
-
-boost::shared_ptr<FieldValue> ManagementAgent::toFieldValue(const Variant& in)
-{
-
-    switch(in.getType()) {
-
-    case types::VAR_VOID:   return boost::shared_ptr<FieldValue>(new VoidValue());
-    case types::VAR_BOOL:   return boost::shared_ptr<FieldValue>(new BoolValue(in.asBool()));
-    case types::VAR_UINT8:  return boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8()));
-    case types::VAR_UINT16: return boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16()));
-    case types::VAR_UINT32: return boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32()));
-    case types::VAR_UINT64: return boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64()));
-    case types::VAR_INT8:   return boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8()));
-    case types::VAR_INT16:  return boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16()));
-    case types::VAR_INT32:  return boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32()));
-    case types::VAR_INT64:  return boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64()));
-    case types::VAR_FLOAT:  return boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat()));
-    case types::VAR_DOUBLE: return boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble()));
-    case types::VAR_STRING: return boost::shared_ptr<FieldValue>(new Str16Value(in.asString()));
-    case types::VAR_UUID:   return boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data()));
-    case types::VAR_MAP:    return boost::shared_ptr<FieldValue>(new FieldTableValue(ManagementAgent::fromMap(in.asMap())));
-    case types::VAR_LIST:   return boost::shared_ptr<FieldValue>(new ListValue(ManagementAgent::fromList(in.asList())));
-    }
-
-    QPID_LOG(error, "Unknown Variant type - not converted: [" << in.getType() << "]");
-    return boost::shared_ptr<FieldValue>(new VoidValue());
-}
-
-// stolen from qpid/client/amqp0_10/Codecs.cpp - TODO: make Codecs public, and remove this dup.
-Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in)
-{
-    const string iso885915("iso-8859-15");
-    const string utf8("utf8");
-    const string utf16("utf16");
-    //const string binary("binary");
-    const string amqp0_10_binary("amqp0-10:binary");
-    //const string amqp0_10_bit("amqp0-10:bit");
-    const string amqp0_10_datetime("amqp0-10:datetime");
-    const string amqp0_10_struct("amqp0-10:struct");
-    Variant out;
-
-    //based on AMQP 0-10 typecode, pick most appropriate variant type
-    switch (in->getType()) {
-        //Fixed Width types:
-    case 0x00: //bin8
-    case 0x01: out.setEncoding(amqp0_10_binary); // int8
-    case 0x02: out = in->getIntegerValue<int8_t, 1>(); break;  //uint8
-    case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break;  // 
-        // case 0x04: break; //TODO: iso-8859-15 char  // char
-    case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break;  // bool int8
-
-    case 0x10: out.setEncoding(amqp0_10_binary);  // bin16
-    case 0x11: out = in->getIntegerValue<int16_t, 2>(); break;  // int16
-    case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break;  //uint16
-
-    case 0x20: out.setEncoding(amqp0_10_binary);   // bin32
-    case 0x21: out = in->getIntegerValue<int32_t, 4>(); break;  // int32
-    case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; // uint32
-
-    case 0x23: out = in->get<float>(); break;  // float(32)
-
-        // case 0x27: break; //TODO: utf-32 char
-
-    case 0x30: out.setEncoding(amqp0_10_binary); // bin64
-    case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; //int64
-
-    case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding
-    case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break;  //uint64
-    case 0x33: out = in->get<double>(); break;  // double
-
-    case 0x48: // uuid
-        {
-            unsigned char data[16];
-            in->getFixedWidthValue<16>(data);
-            out = qpid::types::Uuid(data);
-        } break;
-
-        //TODO: figure out whether and how to map values with codes 0x40-0xd8
-
-    case 0xf0: break;//void, which is the default value for Variant
-        // case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant
-
-        //Variable Width types:
-        //strings:
-    case 0x80: // str8
-    case 0x90: // str16
-    case 0xa0: // str32
-        out = in->get<string>();
-        out.setEncoding(amqp0_10_binary);
-        break;
-
-    case 0x84: // str8
-    case 0x94: // str16
-        out = in->get<string>();
-        out.setEncoding(iso885915);
-        break;
-
-    case 0x85: // str8
-    case 0x95: // str16
-        out = in->get<string>();
-        out.setEncoding(utf8);
-        break;
-
-    case 0x86: // str8
-    case 0x96: // str16
-        out = in->get<string>();
-        out.setEncoding(utf16);
-        break;
-
-    case 0xab:  // str32
-        out = in->get<string>();
-        out.setEncoding(amqp0_10_struct);
-        break;
-
-    case 0xa8:  // map
-        out = ManagementAgent::toMap(in->get<FieldTable>());
-        break;
-
-    case 0xa9: // list of variant types
-        out = ManagementAgent::toList(in->get<List>());
-        break;
-        //case 0xaa: //convert amqp0-10 array (uniform type) into variant list
-        // out = Variant::List();
-        // translate<Array>(in, out.asList(), &toVariant);
-        // break;
-
-      default:
-          //error?
-          QPID_LOG(error, "Unknown FieldValue type - not converted: [" << (unsigned int)(in->getType()) << "]");
-          break;
-    }
-
-    return out;
-}
-
 
 // Build up a list of the current set of deleted objects that are pending their
 // next (last) publish-ment.
@@ -3085,3 +2915,21 @@ bool ManagementAgent::moveDeletedObjects
     }
     return !deleteList.empty();
 }
+
+namespace qpid {
+namespace management {
+
+namespace {
+QPID_TSS const qpid::broker::ConnectionState* executionContext = 0;
+}
+
+void setManagementExecutionContext(const qpid::broker::ConnectionState* ctxt)
+{
+    executionContext = ctxt;
+}
+const qpid::broker::ConnectionState* getManagementExecutionContext()
+{
+    return executionContext;
+}
+
+}}

Propchange: qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 14:42:12 2011
@@ -0,0 +1,2 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1061302-1072333
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1072051-1187351

Modified: qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h Fri Oct 21 14:42:12 2011
@@ -41,6 +41,9 @@
 #include <map>
 
 namespace qpid {
+namespace broker {
+class ConnectionState;
+}
 namespace management {
 
 class ManagementAgent
@@ -142,13 +145,7 @@ public:
     const framing::Uuid& getUuid() const { return uuid; }
     void setUuid(const framing::Uuid& id) { uuid = id; writeData(); }
 
-    // TODO: remove these when Variant API moved into common library.
     static types::Variant::Map toMap(const framing::FieldTable& from);
-    static framing::FieldTable fromMap(const types::Variant::Map& from);
-    static types::Variant::List toList(const framing::List& from);
-    static framing::List fromList(const types::Variant::List& from);
-    static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in);
-    static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val);
 
     // For Clustering: management objects that have been marked as
     // "deleted", but are waiting for their last published object
@@ -422,6 +419,8 @@ private:
     void debugSnapshot(const char* title);
 };
 
+void setManagementExecutionContext(const qpid::broker::ConnectionState*);
+const qpid::broker::ConnectionState* getManagementExecutionContext();
 }}
-            
+
 #endif  /*!_ManagementAgent_*/

Propchange: qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 14:42:12 2011
@@ -0,0 +1,2 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h:1061302-1072333
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:1072051-1187351

Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/AddressParser.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/AddressParser.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/AddressParser.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/AddressParser.cpp Fri Oct 21 14:42:12 2011
@@ -151,7 +151,7 @@ bool AddressParser::readValueIfExists(Va
 bool AddressParser::readString(std::string& value, char delimiter)
 {
     if (readChar(delimiter)) {
-        std::string::size_type start = current++;
+        std::string::size_type start = current;
         while (!eos()) {
             if (input.at(current) == delimiter) {
                 if (current > start) {
@@ -201,7 +201,8 @@ bool AddressParser::readSimpleValue(Vari
 {
     std::string s;
     if (readWord(s)) {
-        value.parse(s);        
+        value.parse(s);
+        if (value.getType() == VAR_STRING) value.setEncoding("utf8");
         return true;
     } else {
         return false;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/Duration.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/Duration.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/Duration.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/Duration.cpp Fri Oct 21 14:42:12 2011
@@ -37,6 +37,16 @@ Duration operator*(uint64_t multiplier, 
     return Duration(duration.getMilliseconds() * multiplier);
 }
 
+bool operator==(const Duration& a, const Duration& b)
+{
+    return a.getMilliseconds() == b.getMilliseconds();
+}
+
+bool operator!=(const Duration& a, const Duration& b)
+{
+    return a.getMilliseconds() != b.getMilliseconds();
+}
+
 const Duration Duration::FOREVER(std::numeric_limits<uint64_t>::max());
 const Duration Duration::IMMEDIATE(0);
 const Duration Duration::SECOND(1000);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/Message.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/Message.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/Message.cpp Fri Oct 21 14:42:12 2011
@@ -21,6 +21,7 @@
 #include "qpid/messaging/Message.h"
 #include "qpid/messaging/MessageImpl.h"
 #include "qpid/amqp_0_10/Codecs.h"
+#include <qpid/Exception.h>
 #include <boost/format.hpp>
 
 namespace qpid {
@@ -115,7 +116,11 @@ template <class C> struct MessageCodec
     static void decode(const Message& message, typename C::ObjectType& object, const std::string& encoding)
     {
         checkEncoding(message, encoding);
-        C::decode(message.getContent(), object);
+        try {
+            C::decode(message.getContent(), object);
+        } catch (const qpid::Exception &ex) {
+            throw EncodingException(ex.what());
+        }
     }
 
     static void encode(const typename C::ObjectType& map, Message& message, const std::string& encoding)

Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/Session.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/Session.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/Session.cpp Fri Oct 21 14:42:12 2011
@@ -39,7 +39,8 @@ Session& Session::operator=(const Sessio
 void Session::commit() { impl->commit(); }
 void Session::rollback() { impl->rollback(); }
 void Session::acknowledge(bool sync) { impl->acknowledge(sync); }
-void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m); sync(s); }
+void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m, false); sync(s); }
+void Session::acknowledgeUpTo(Message& m, bool s) { impl->acknowledge(m, true); sync(s); }
 void Session::reject(Message& m) { impl->reject(m); }
 void Session::release(Message& m) { impl->release(m); }
 void Session::close() { impl->close(); }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/SessionImpl.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/SessionImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/SessionImpl.h Fri Oct 21 14:42:12 2011
@@ -41,7 +41,7 @@ class SessionImpl : public virtual qpid:
     virtual void commit() = 0;
     virtual void rollback() = 0;
     virtual void acknowledge(bool sync) = 0;
-    virtual void acknowledge(Message&) = 0;
+    virtual void acknowledge(Message&, bool cumulative) = 0;
     virtual void reject(Message&) = 0;
     virtual void release(Message&) = 0;
     virtual void close() = 0;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicatingEventListener.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicatingEventListener.cpp Fri Oct 21 14:42:12 2011
@@ -69,10 +69,9 @@ void ReplicatingEventListener::deliverDe
 void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued)
 {
     boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload));
-    FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
-    headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
-    headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
-    headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position);
+    msg->insertCustomProperty(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
+    msg->insertCustomProperty(REPLICATION_EVENT_TYPE, ENQUEUE);
+    msg->insertCustomProperty(QUEUE_MESSAGE_POSITION,enqueued.position);
     route(msg);
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicationExchange.cpp Fri Oct 21 14:42:12 2011
@@ -97,11 +97,10 @@ void ReplicationExchange::handleEnqueueE
         } else {
             queue->setPosition(seqno1);  
 
-            FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
-            headers.erase(REPLICATION_TARGET_QUEUE);
-            headers.erase(REPLICATION_EVENT_SEQNO);
-            headers.erase(REPLICATION_EVENT_TYPE);
-            headers.erase(QUEUE_MESSAGE_POSITION);
+            msg.getMessage().removeCustomProperty(REPLICATION_TARGET_QUEUE);
+            msg.getMessage().removeCustomProperty(REPLICATION_EVENT_SEQNO);
+            msg.getMessage().removeCustomProperty(REPLICATION_EVENT_TYPE);
+            msg.getMessage().removeCustomProperty(QUEUE_MESSAGE_POSITION);
             msg.deliverTo(queue);
             QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
             if (mgmtExchange != 0) {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/store/StorageProvider.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/store/StorageProvider.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/store/StorageProvider.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/store/StorageProvider.h Fri Oct 21 14:42:12 2011
@@ -54,7 +54,7 @@ struct QueueEntry {
     QueueEntry(uint64_t id, TplStatus tpl = NONE, const std::string& x = "")
         : queueId(id), tplStatus(tpl), xid(x) {}
 
-    bool operator==(const QueueEntry& rhs) {
+    bool operator==(const QueueEntry& rhs) const {
         if (queueId != rhs.queueId) return false;
         if (tplStatus == NONE && rhs.tplStatus == NONE) return true;
         return xid == rhs.xid;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AggregateOutput.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AggregateOutput.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AggregateOutput.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AggregateOutput.h Fri Oct 21 14:42:12 2011
@@ -41,7 +41,7 @@ namespace sys {
  * doOutput is called in another.
  */
 
-class AggregateOutput : public OutputTask, public OutputControl
+class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public OutputControl
 {
     typedef std::deque<OutputTask*> TaskList;
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIO.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIO.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIO.h Fri Oct 21 14:42:12 2011
@@ -64,8 +64,8 @@ public:
     // deletes. To correctly manage heaps when needed, the allocate and
     // delete should both be done from the same class/library.
     QPID_COMMON_EXTERN static AsynchConnector* create(const Socket& s,
-                                   std::string hostname,
-                                   uint16_t port,
+                                   const std::string& hostname,
+                                   const std::string& port,
                                    ConnectedCallback connCb,
                                    FailedCallback failCb);
     virtual void start(boost::shared_ptr<Poller> poller) = 0;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIOHandler.h Fri Oct 21 14:42:12 2011
@@ -57,7 +57,7 @@ class AsynchIOHandler : public OutputCon
     QPID_COMMON_EXTERN ~AsynchIOHandler();
     QPID_COMMON_EXTERN void init(AsynchIO* a, int numBuffs);
 
-    QPID_COMMON_EXTERN void setClient() { isClient = true; }
+    QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
 
     // Output side
     QPID_COMMON_EXTERN void abort();

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue.h Fri Oct 21 14:42:12 2011
@@ -22,7 +22,12 @@
  *
  */
 
-#if defined( __GNUC__ ) && __GNUC__ >= 4 && ( defined( __i686__ ) || defined( __x86_64__ ) )
+// Have to check for clang before gcc as clang pretends to be gcc too
+#if defined( __clang__ )
+// Use the clang doesn't support atomic builtins for 64 bit values, so use the slow versions
+#include "qpid/sys/AtomicValue_mutex.h"
+
+#elif defined( __GNUC__ ) && __GNUC__ >= 4 && ( defined( __i686__ ) || defined( __x86_64__ ) )
 // Use the Gnu C built-in atomic operations if compiling with gcc on a suitable platform.
 #include "qpid/sys/AtomicValue_gcc.h"
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue_gcc.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue_gcc.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue_gcc.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue_gcc.h Fri Oct 21 14:42:12 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,6 +39,9 @@ class AtomicValue
   public:
     AtomicValue(T init=0) : value(init) {}
 
+    // Not atomic. Don't call concurrently with atomic ops.
+    AtomicValue<T>& operator=(T newValue) { value = newValue; return *this; }
+
     // Update and return new value.
     inline T operator+=(T n) { return __sync_add_and_fetch(&value, n); }
     inline T operator-=(T n) { return __sync_sub_and_fetch(&value, n); }
@@ -54,11 +57,11 @@ class AtomicValue
     /** If current value == testval then set to newval. Returns the old value. */
     T valueCompareAndSwap(T testval, T newval) { return __sync_val_compare_and_swap(&value, testval, newval); }
 
-    /** If current value == testval then set to newval. Returns true if the swap was performed. */    
+    /** If current value == testval then set to newval. Returns true if the swap was performed. */
     bool boolCompareAndSwap(T testval, T newval) { return __sync_bool_compare_and_swap(&value, testval, newval); }
 
     T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(static_cast<T>(0)); }
-        
+
   private:
     T value;
 };

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.cpp Fri Oct 21 14:42:12 2011
@@ -34,8 +34,6 @@ QPID_TSS bool inContext = false;
 
 bool isClusterSafe() { return !inCluster || inContext; }
 
-bool isCluster() { return inCluster; }
-
 void assertClusterSafe()  {
     if (!isClusterSafe()) {
         QPID_LOG(critical, "Modified cluster state outside of cluster context");
@@ -53,6 +51,16 @@ ClusterSafeScope::~ClusterSafeScope() {
     inContext = save;
 }
 
+ClusterUnsafeScope::ClusterUnsafeScope()  {
+    save = inContext;
+    inContext = false;
+}
+
+ClusterUnsafeScope::~ClusterUnsafeScope() {
+    assert(!inContext);
+    inContext = save;
+}
+
 void enableClusterSafe() { inCluster = true; }
 
 }} // namespace qpid::sys

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.h Fri Oct 21 14:42:12 2011
@@ -52,14 +52,9 @@ QPID_COMMON_EXTERN void assertClusterSaf
  */
 QPID_COMMON_EXTERN bool isClusterSafe();
 
-/** Return true in a clustered broker */
-QPID_COMMON_EXTERN bool isCluster();
-
 /**
- * Base class for classes that encapsulate state which is replicated
- * to all members of a cluster. Acts as a marker for clustered state
- * and provides functions to assist detecting bugs in cluster
- * behavior.
+ *  Mark a scope as cluster safe. Sets isClusterSafe in constructor and resets
+ *  to previous value in destructor.
  */
 class ClusterSafeScope {
   public:
@@ -70,6 +65,18 @@ class ClusterSafeScope {
 };
 
 /**
+ *  Mark a scope as cluster unsafe. Clears isClusterSafe in constructor and resets
+ *  to previous value in destructor.
+ */
+class ClusterUnsafeScope {
+  public:
+    QPID_COMMON_EXTERN ClusterUnsafeScope();
+    QPID_COMMON_EXTERN ~ClusterUnsafeScope();
+  private:
+    bool save;
+};
+
+/**
  * Enable cluster-safe assertions. By default they are no-ops.
  * Called by cluster code.
  */

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/CopyOnWriteArray.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/CopyOnWriteArray.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/CopyOnWriteArray.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/CopyOnWriteArray.h Fri Oct 21 14:42:12 2011
@@ -43,6 +43,12 @@ public:
     CopyOnWriteArray() {}
     CopyOnWriteArray(const CopyOnWriteArray& c) : array(c.array) {}
 
+    bool empty()
+    {
+        Mutex::ScopedLock l(lock);
+        return array ? array->empty() : true;
+    }
+
     void add(T& t)
     {
         Mutex::ScopedLock l(lock);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/PollableQueue.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/PollableQueue.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/PollableQueue.h Fri Oct 21 14:42:12 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,7 +28,8 @@
 #include <boost/function.hpp>
 #include <boost/bind.hpp>
 #include <algorithm>
-#include <vector>
+#include <deque>
+#include "qpid/log/Statement.h" // FIXME aconway 2011-08-05:
 
 namespace qpid {
 namespace sys {
@@ -44,7 +45,7 @@ class Poller;
 template <class T>
 class PollableQueue {
   public:
-    typedef std::vector<T> Batch;
+    typedef std::deque<T> Batch;
     typedef T value_type;
 
     /**
@@ -68,11 +69,11 @@ class PollableQueue {
                   const boost::shared_ptr<sys::Poller>& poller);
 
     ~PollableQueue();
-    
+
     /** Push a value onto the queue. Thread safe */
     void push(const T& t);
 
-    /** Start polling. */ 
+    /** Start polling. */
     void start();
 
     /** Stop polling and wait for the current callback, if any, to complete. */
@@ -90,14 +91,14 @@ class PollableQueue {
      * ensure clean shutdown with no events left on the queue.
      */
     void shutdown();
-    
+
   private:
     typedef sys::Monitor::ScopedLock ScopedLock;
     typedef sys::Monitor::ScopedUnlock ScopedUnlock;
 
     void dispatch(PollableCondition& cond);
     void process();
-    
+
     mutable sys::Monitor lock;
     Callback callback;
     PollableCondition condition;
@@ -107,7 +108,7 @@ class PollableQueue {
 };
 
 template <class T> PollableQueue<T>::PollableQueue(
-    const Callback& cb, const boost::shared_ptr<sys::Poller>& p) 
+    const Callback& cb, const boost::shared_ptr<sys::Poller>& p)
   : callback(cb),
     condition(boost::bind(&PollableQueue<T>::dispatch, this, _1), p),
     stopped(true)
@@ -151,7 +152,7 @@ template <class T> void PollableQueue<T>
             putBack = callback(batch);
         }
         // put back unprocessed items.
-        queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end())); 
+        queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end()));
         batch.clear();
     }
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/Poller.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/Poller.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/Poller.h Fri Oct 21 14:42:12 2011
@@ -120,7 +120,7 @@ class PollerHandle {
     friend struct Poller::Event;
 
     PollerHandlePrivate* const impl;
-    QPID_COMMON_EXTERN virtual void processEvent(Poller::EventType) {};
+    QPID_COMMON_INLINE_EXTERN virtual void processEvent(Poller::EventType) {};
 
 public:
     QPID_COMMON_EXTERN PollerHandle(const IOHandle& h);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ProtocolFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ProtocolFactory.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ProtocolFactory.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ProtocolFactory.h Fri Oct 21 14:42:12 2011
@@ -39,11 +39,10 @@ class ProtocolFactory : public qpid::Sha
 
     virtual ~ProtocolFactory() = 0;
     virtual uint16_t getPort() const = 0;
-    virtual std::string getHost() const = 0;
     virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
     virtual void connect(
         boost::shared_ptr<Poller>,
-        const std::string& host, int16_t port,
+        const std::string& host, const std::string& port,
         ConnectionCodec::Factory* codec,
         ConnectFailedCallback failed) = 0;
     virtual bool supports(const std::string& /*capability*/) { return false; }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/RdmaIOPlugin.cpp Fri Oct 21 14:42:12 2011
@@ -31,7 +31,6 @@
 #include "qpid/sys/SecuritySettings.h"
 
 #include <boost/bind.hpp>
-#include <boost/lexical_cast.hpp>
 #include <memory>
 
 #include <netdb.h>
@@ -212,10 +211,9 @@ void RdmaIOHandler::readbuff(Rdma::Async
     if (readError) {
         return;
     }
-    size_t decoded = 0;
     try {
         if (codec) {
-            decoded = codec->decode(buff->bytes(), buff->dataCount());
+            (void) codec->decode(buff->bytes(), buff->dataCount());
         }else{
             // Need to start protocol processing
             initProtocolIn(buff);
@@ -230,9 +228,7 @@ void RdmaIOHandler::readbuff(Rdma::Async
 void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) {
     framing::Buffer in(buff->bytes(), buff->dataCount());
     framing::ProtocolInitiation protocolInit;
-    size_t decoded = 0;
     if (protocolInit.decode(in)) {
-        decoded = in.getPosition();
         QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
 
         codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
@@ -254,10 +250,9 @@ class RdmaIOProtocolFactory : public Pro
   public:
     RdmaIOProtocolFactory(int16_t port, int backlog);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
-    void connect(Poller::shared_ptr, const string& host, int16_t port, ConnectionCodec::Factory*, ConnectFailedCallback);
+    void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
 
     uint16_t getPort() const;
-    string getHost() const;
 
   private:
     bool request(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*);
@@ -347,18 +342,7 @@ uint16_t RdmaIOProtocolFactory::getPort(
     return listeningPort; // Immutable no need for lock.
 }
 
-string RdmaIOProtocolFactory::getHost() const {
-    //return listener.getSockname();
-    return "";
-}
-
 void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
-    ::sockaddr_in sin;
-
-    sin.sin_family = AF_INET;
-    sin.sin_port = htons(listeningPort);
-    sin.sin_addr.s_addr = INADDR_ANY;
-
     listener.reset(
         new Rdma::Listener(
             Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES),
@@ -387,7 +371,7 @@ void RdmaIOProtocolFactory::connected(Po
 
 void RdmaIOProtocolFactory::connect(
     Poller::shared_ptr poller,
-    const std::string& host, int16_t port,
+    const std::string& host, const std::string& port,
     ConnectionCodec::Factory* f,
     ConnectFailedCallback failed)
 {
@@ -399,7 +383,7 @@ void RdmaIOProtocolFactory::connect(
             boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
             boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed));
 
-    SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+    SocketAddress sa(host, port);
     c->start(poller, sa);
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/Socket.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/Socket.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/Socket.h Fri Oct 21 14:42:12 2011
@@ -33,21 +33,21 @@ namespace sys {
 class Duration;
 class SocketAddress;
 
-class Socket : public IOHandle
+class QPID_COMMON_CLASS_EXTERN Socket : public IOHandle
 {
 public:
     /** Create a socket wrapper for descriptor. */
     QPID_COMMON_EXTERN Socket();
 
-    /** Set timeout for read and write */
-    void setTimeout(const Duration& interval) const;
+    /** Create a new Socket which is the same address family as this one */
+    QPID_COMMON_EXTERN Socket* createSameTypeSocket() const;
 
     /** Set socket non blocking */
     void setNonblocking() const;
 
     QPID_COMMON_EXTERN void setTcpNoDelay() const;
 
-    QPID_COMMON_EXTERN void connect(const std::string& host, uint16_t port) const;
+    QPID_COMMON_EXTERN void connect(const std::string& host, const std::string& port) const;
     QPID_COMMON_EXTERN void connect(const SocketAddress&) const;
 
     QPID_COMMON_EXTERN void close() const;
@@ -57,19 +57,9 @@ public:
      *@param backlog maximum number of pending connections.
      *@return The bound port.
      */
-    QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const;
+    QPID_COMMON_EXTERN int listen(const std::string& host = "", const std::string& port = "0", int backlog = 10) const;
     QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const;
 
-    /** Returns the "socket name" ie the address bound to
-     * the near end of the socket
-     */
-    QPID_COMMON_EXTERN std::string getSockname() const;
-
-    /** Returns the "peer name" ie the address bound to
-     * the remote end of the socket
-     */
-    std::string getPeername() const;
-
     /**
      * Returns an address (host and port) for the remote end of the
      * socket
@@ -84,16 +74,13 @@ public:
     /**
      * Returns the full address of the connection: local and remote host and port.
      */
-    QPID_COMMON_EXTERN std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
-
-    QPID_COMMON_EXTERN uint16_t getLocalPort() const;
-    uint16_t getRemotePort() const;
+    QPID_COMMON_INLINE_EXTERN std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
 
     /**
      * Returns the error code stored in the socket.  This may be used
      * to determine the result of a non-blocking connect.
      */
-    int getError() const;
+    QPID_COMMON_EXTERN int getError() const;
 
     /** Accept a connection from a socket that is already listening
      * and has an incoming connection
@@ -108,8 +95,13 @@ private:
     /** Create socket */
     void createSocket(const SocketAddress&) const;
 
+public:
+    /** Construct socket with existing handle */
     Socket(IOHandlePrivate*);
-    mutable std::string connectname;
+
+protected:
+    mutable std::string localname;
+    mutable std::string peername;
     mutable bool nonblocking;
     mutable bool nodelay;
 };

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/SocketAddress.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/SocketAddress.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/SocketAddress.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/SocketAddress.h Fri Oct 21 14:42:12 2011
@@ -27,6 +27,7 @@
 #include <string>
 
 struct addrinfo;
+struct sockaddr;
 
 namespace qpid {
 namespace sys {
@@ -41,12 +42,19 @@ public:
     QPID_COMMON_EXTERN SocketAddress& operator=(const SocketAddress&);
     QPID_COMMON_EXTERN ~SocketAddress();
 
-    std::string asString() const;
+    QPID_COMMON_EXTERN bool nextAddress();
+    QPID_COMMON_EXTERN std::string asString(bool numeric=true) const;
+    QPID_COMMON_EXTERN void setAddrInfoPort(uint16_t port);
+
+    QPID_COMMON_EXTERN static std::string asString(::sockaddr const * const addr, size_t addrlen);
+    QPID_COMMON_EXTERN static uint16_t getPort(::sockaddr const * const addr);
+    
 
 private:
     std::string host;
     std::string port;
     mutable ::addrinfo* addrInfo;
+    mutable ::addrinfo* currentAddrInfo;
 };
 
 }}

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/SslPlugin.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/SslPlugin.cpp Fri Oct 21 14:42:12 2011
@@ -25,6 +25,8 @@
 #include "qpid/sys/ssl/check.h"
 #include "qpid/sys/ssl/util.h"
 #include "qpid/sys/ssl/SslHandler.h"
+#include "qpid/sys/AsynchIOHandler.h"
+#include "qpid/sys/AsynchIO.h"
 #include "qpid/sys/ssl/SslIo.h"
 #include "qpid/sys/ssl/SslSocket.h"
 #include "qpid/broker/Broker.h"
@@ -37,15 +39,19 @@
 namespace qpid {
 namespace sys {
 
+using namespace qpid::sys::ssl;
+
 struct SslServerOptions : ssl::SslOptions
 {
     uint16_t port;
     bool clientAuth;
     bool nodict;
+    bool multiplex;
 
     SslServerOptions() : port(5671),
                          clientAuth(false),
-                         nodict(false)
+                         nodict(false),
+                         multiplex(false)
     {
         addOptions()
             ("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
@@ -56,29 +62,37 @@ struct SslServerOptions : ssl::SslOption
     }
 };
 
-class SslProtocolFactory : public ProtocolFactory {
+template <class T>
+class SslProtocolFactoryTmpl : public ProtocolFactory {
+  private:
+
+    typedef SslAcceptorTmpl<T> SslAcceptor;
+
     const bool tcpNoDelay;
-    qpid::sys::ssl::SslSocket listener;
+    T listener;
     const uint16_t listeningPort;
-    std::auto_ptr<qpid::sys::ssl::SslAcceptor> acceptor;
+    std::auto_ptr<SslAcceptor> acceptor;
     bool nodict;
 
   public:
-    SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
+    SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
-    void connect(Poller::shared_ptr, const std::string& host, int16_t port,
+    void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
                  ConnectionCodec::Factory*,
                  boost::function2<void, int, std::string> failed);
 
     uint16_t getPort() const;
-    std::string getHost() const;
     bool supports(const std::string& capability);
 
   private:
-    void established(Poller::shared_ptr, const qpid::sys::ssl::SslSocket&, ConnectionCodec::Factory*,
+    void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
                      bool isClient);
 };
 
+typedef SslProtocolFactoryTmpl<SslSocket> SslProtocolFactory;
+typedef SslProtocolFactoryTmpl<SslMuxSocket> SslMuxProtocolFactory;
+
+
 // Static instance to initialise plugin
 static struct SslPlugin : public Plugin {
     SslServerOptions options;
@@ -87,24 +101,48 @@ static struct SslPlugin : public Plugin 
 
     ~SslPlugin() { ssl::shutdownNSS(); }
 
-    void earlyInitialize(Target&) {
+    void earlyInitialize(Target& target) {
+        broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+        if (broker && !options.certDbPath.empty()) {
+            const broker::Broker::Options& opts = broker->getOptions();
+
+            if (opts.port == options.port && // AMQP & AMQPS ports are the same
+                opts.port != 0) {
+                // The presence of this option is used to signal to the TCP
+                // plugin not to start listening on the shared port. The actual
+                // value cannot be configured through the command line or config
+                // file (other than by setting the ports to the same value)
+                // because we are only adding it after option parsing.
+                options.multiplex = true;
+                options.addOptions()("ssl-multiplex", optValue(options.multiplex), "Allow SSL and non-SSL connections on the same port");
+            }
+        }
     }
     
     void initialize(Target& target) {
+        QPID_LOG(trace, "Initialising SSL plugin");
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
         // Only provide to a Broker
         if (broker) {
             if (options.certDbPath.empty()) {
-                QPID_LOG(info, "SSL plugin not enabled, you must set --ssl-cert-db to enable it.");                    
+                QPID_LOG(notice, "SSL plugin not enabled, you must set --ssl-cert-db to enable it.");
             } else {
                 try {
                     ssl::initNSS(options, true);
                     
                     const broker::Broker::Options& opts = broker->getOptions();
-                    ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options,
-                                                                                opts.connectionBacklog,
-                                                                                opts.tcpNoDelay));
-                    QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
+
+                    ProtocolFactory::shared_ptr protocol(options.multiplex ?
+                        static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options,
+                                                  opts.connectionBacklog,
+                                                  opts.tcpNoDelay)) :
+                        static_cast<ProtocolFactory*>(new SslProtocolFactory(options,
+                                               opts.connectionBacklog,
+                                               opts.tcpNoDelay)));
+                    QPID_LOG(notice, "Listening for " <<
+                                     (options.multiplex ? "SSL or TCP" : "SSL") <<
+                                     " connections on TCP port " <<
+                                     protocol->getPort());
                     broker->registerProtocolFactory("ssl", protocol);
                 } catch (const std::exception& e) {
                     QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what());
@@ -114,13 +152,15 @@ static struct SslPlugin : public Plugin 
     }
 } sslPlugin;
 
-SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, int backlog, bool nodelay) :
+template <class T>
+SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay) :
     tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)),
     nodict(options.nodict)
 {}
 
-void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys::ssl::SslSocket& s,
-                                          ConnectionCodec::Factory* f, bool isClient) {
+void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
+                    ConnectionCodec::Factory* f, bool isClient,
+                    bool tcpNoDelay, bool nodict) {
     qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
 
     if (tcpNoDelay) {
@@ -128,8 +168,10 @@ void SslProtocolFactory::established(Pol
         QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
     }
 
-    if (isClient)
+    if (isClient) {
         async->setClient();
+    }
+
     qpid::sys::ssl::SslIO* aio = new qpid::sys::ssl::SslIO(s,
                                  boost::bind(&qpid::sys::ssl::SslHandler::readbuff, async, _1, _2),
                                  boost::bind(&qpid::sys::ssl::SslHandler::eof, async, _1),
@@ -142,25 +184,66 @@ void SslProtocolFactory::established(Pol
     aio->start(poller);
 }
 
-uint16_t SslProtocolFactory::getPort() const {
-    return listeningPort; // Immutable no need for lock.
+template <>
+void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
+                                     ConnectionCodec::Factory* f, bool isClient) {
+    const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
+
+    SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
 }
 
-std::string SslProtocolFactory::getHost() const {
-    return listener.getSockname();
+template <class T>
+uint16_t SslProtocolFactoryTmpl<T>::getPort() const {
+    return listeningPort; // Immutable no need for lock.
 }
 
-void SslProtocolFactory::accept(Poller::shared_ptr poller,
-                                     ConnectionCodec::Factory* fact) {
+template <class T>
+void SslProtocolFactoryTmpl<T>::accept(Poller::shared_ptr poller,
+                                       ConnectionCodec::Factory* fact) {
     acceptor.reset(
-        new qpid::sys::ssl::SslAcceptor(listener,
-                           boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
+        new SslAcceptor(listener,
+                        boost::bind(&SslProtocolFactoryTmpl<T>::established,
+                                    this, poller, _1, fact, false)));
     acceptor->start(poller);
 }
 
-void SslProtocolFactory::connect(
+template <>
+void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
+                                        ConnectionCodec::Factory* f, bool isClient) {
+    const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
+
+    if (sslSock) {
+        SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
+        return;
+    }
+
+    AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f);
+
+    if (tcpNoDelay) {
+        s.setTcpNoDelay();
+        QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+    }
+
+    if (isClient) {
+        async->setClient();
+    }
+    AsynchIO* aio = AsynchIO::create
+      (s,
+       boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+       boost::bind(&AsynchIOHandler::eof, async, _1),
+       boost::bind(&AsynchIOHandler::disconnect, async, _1),
+       boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+       boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+       boost::bind(&AsynchIOHandler::idle, async, _1));
+
+    async->init(aio, 4);
+    aio->start(poller);
+}
+
+template <class T>
+void SslProtocolFactoryTmpl<T>::connect(
     Poller::shared_ptr poller,
-    const std::string& host, int16_t port,
+    const std::string& host, const std::string& port,
     ConnectionCodec::Factory* fact,
     ConnectFailedCallback failed)
 {
@@ -171,9 +254,9 @@ void SslProtocolFactory::connect(
     // is no longer needed.
 
     qpid::sys::ssl::SslSocket* socket = new qpid::sys::ssl::SslSocket();
-    new qpid::sys::ssl::SslConnector (*socket, poller, host, port,
-                         boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, true),
-                         failed);
+    new SslConnector(*socket, poller, host, port,
+                     boost::bind(&SslProtocolFactoryTmpl<T>::established, this, poller, _1, fact, true),
+                     failed);
 }
 
 namespace
@@ -181,6 +264,7 @@ namespace
 const std::string SSL = "ssl";
 }
 
+template <>
 bool SslProtocolFactory::supports(const std::string& capability)
 {
     std::string s = capability;
@@ -188,4 +272,12 @@ bool SslProtocolFactory::supports(const 
     return s == SSL;
 }
 
+template <>
+bool SslMuxProtocolFactory::supports(const std::string& capability)
+{
+    std::string s = capability;
+    transform(s.begin(), s.end(), s.begin(), tolower);
+    return s == SSL || s == "tcp";
+}
+
 }} // namespace qpid::sys

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/StateMonitor.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/StateMonitor.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/StateMonitor.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/StateMonitor.h Fri Oct 21 14:42:12 2011
@@ -41,9 +41,9 @@ class StateMonitor : public Waitable
     struct Set : public std::bitset<MaxEnum + 1> {
         Set() {}
         Set(Enum s) { set(s); }
-        Set(Enum s, Enum t) { set(s).set(t); }
-        Set(Enum s, Enum t, Enum u) { set(s).set(t).set(u); }
-        Set(Enum s, Enum t, Enum u, Enum v) { set(s).set(t).set(u).set(v); }
+        Set(Enum s, Enum t) { std::bitset<MaxEnum + 1>::set(s).set(t); }
+        Set(Enum s, Enum t, Enum u) { std::bitset<MaxEnum + 1>::set(s).set(t).set(u); }
+        Set(Enum s, Enum t, Enum u, Enum v) { std::bitset<MaxEnum + 1>::set(s).set(t).set(u).set(v); }
     };
 
 
@@ -60,13 +60,13 @@ class StateMonitor : public Waitable
     operator Enum() const { return state; }
 
     /** @pre Caller holds a ScopedLock */
-    void waitFor(Enum s) { ScopedWait(*this); while (s != state) wait(); }
+    void waitFor(Enum s) { ScopedWait w(*this); while (s != state) wait(); }
     /** @pre Caller holds a ScopedLock */
-    void waitFor(Set s) { ScopedWait(*this); while (!s.test(state)) wait(); }
+    void waitFor(Set s) { ScopedWait w(*this); while (!s.test(state)) wait(); }
     /** @pre Caller holds a ScopedLock */
-    void waitNot(Enum s) { ScopedWait(*this); while (s == state) wait(); }
+    void waitNot(Enum s) { ScopedWait w(*this); while (s == state) wait(); }
     /** @pre Caller holds a ScopedLock */
-    void waitNot(Set s) { ScopedWait(*this); while (s.test(state)) wait(); }
+    void waitNot(Set s) { ScopedWait w(*this); while (s.test(state)) wait(); }
     
   private:
     Enum state;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org