You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC

svn commit: r1525101 [5/21] - in /qpid/branches/linearstore/qpid: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/...

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionHandler.cpp Fri Sep 20 18:59:30 2013
@@ -39,7 +39,11 @@ SessionHandler::SessionHandler(amqp_0_10
       proxy(out)
 {}
 
-SessionHandler::~SessionHandler() {}
+SessionHandler::~SessionHandler()
+{
+    if (session.get())
+        connection.getBroker().getSessionManager().forget(session->getId());
+}
 
 void SessionHandler::connectionException(
     framing::connection::CloseCode code, const std::string& msg)

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Sep 20 18:59:30 2013
@@ -79,7 +79,27 @@ void SessionState::addManagementObject()
     }
 }
 
+void SessionState::startTx() {
+    if (mgmtObject) { mgmtObject->inc_TxnStarts(); }
+}
+
+void SessionState::commitTx() {
+    if (mgmtObject) {
+        mgmtObject->inc_TxnCommits();
+        mgmtObject->inc_TxnCount();
+    }
+}
+
+void SessionState::rollbackTx() {
+    if (mgmtObject) {
+        mgmtObject->inc_TxnRejects();
+        mgmtObject->inc_TxnCount();
+    }
+}
+
 SessionState::~SessionState() {
+    if (mgmtObject != 0)
+        mgmtObject->debugStats("destroying");
     asyncCommandCompleter->cancel();
     semanticState.closed();
     if (mgmtObject != 0)

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.h Fri Sep 20 18:59:30 2013
@@ -127,6 +127,11 @@ class SessionState : public qpid::Sessio
     // belonging to inter-broker bridges
     void addManagementObject();
 
+    // transaction-related methods just to update statistics
+    void startTx();
+    void commitTx();
+    void rollbackTx();
+
   private:
     void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
     void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.cpp Fri Sep 20 18:59:30 2013
@@ -80,3 +80,9 @@ System::System (string _dataDir, Broker*
     }
 }
 
+System::~System ()
+{
+    if (mgmtObject != 0)
+        mgmtObject->debugStats("destroying");
+}
+

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.h Fri Sep 20 18:59:30 2013
@@ -45,6 +45,8 @@ class System : public management::Manage
 
     System (std::string _dataDir, Broker* broker = 0);
 
+    ~System ();
+
     management::ManagementObject::shared_ptr GetManagementObject(void) const
     { return mgmtObject; }
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.cpp Fri Sep 20 18:59:30 2013
@@ -333,7 +333,10 @@ bool TopicExchange::isBound(Queue::share
     return false;
 }
 
-TopicExchange::~TopicExchange() {}
+TopicExchange::~TopicExchange() {
+    if (mgmtExchange != 0)
+        mgmtExchange->debugStats("destroying");
+}
 
 const std::string TopicExchange::typeName("topic");
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.h Fri Sep 20 18:59:30 2013
@@ -81,7 +81,7 @@ class TopicExchange : public virtual Exc
     };
 
 public:
-    static const std::string typeName;
+    QPID_BROKER_EXTERN static const std::string typeName;
 
     static QPID_BROKER_EXTERN std::string normalize(const std::string& pattern);
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.cpp Fri Sep 20 18:59:30 2013
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,8 @@
  *
  */
 #include "qpid/broker/TxAccept.h"
+#include "qpid/broker/TransactionObserver.h"
+#include "qpid/broker/Queue.h"
 #include "qpid/log/Statement.h"
 
 using std::bind1st;
@@ -28,54 +30,23 @@ using namespace qpid::broker;
 using qpid::framing::SequenceSet;
 using qpid::framing::SequenceNumber;
 
-TxAccept::RangeOp::RangeOp(const AckRange& r) : range(r) {}
 
-void TxAccept::RangeOp::prepare(TransactionContext* ctxt)
+TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) :
+    acked(_acked), unacked(_unacked)
 {
-    for_each(range.start, range.end, bind(&DeliveryRecord::dequeue, _1, ctxt));
+    for(SequenceSet::RangeIterator i = acked.rangesBegin(); i != acked.rangesEnd(); ++i)
+        ranges.push_back(DeliveryRecord::findRange(unacked, i->first(), i->last()));
 }
 
-void TxAccept::RangeOp::commit()
-{
-    for_each(range.start, range.end, bind(&DeliveryRecord::committed, _1));
-    for_each(range.start, range.end, bind(&DeliveryRecord::setEnded, _1));
-}
-
-TxAccept::RangeOps::RangeOps(DeliveryRecords& u) : unacked(u) {} 
-
-void TxAccept::RangeOps::operator()(SequenceNumber start, SequenceNumber end)
-{
-    ranges.push_back(RangeOp(DeliveryRecord::findRange(unacked, start, end)));
-}
-
-void TxAccept::RangeOps::prepare(TransactionContext* ctxt)
-{
-    std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt));
-}
-
-void TxAccept::RangeOps::commit()
-{
-    std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1));
-    //now remove if isRedundant():
-    if (!ranges.empty()) {
-        DeliveryRecords::iterator begin = ranges.front().range.start;
-        DeliveryRecords::iterator end = ranges.back().range.end;
-        DeliveryRecords::iterator removed = remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant));
-        unacked.erase(removed, end);
-    }
-}
-
-TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) : 
-    acked(_acked), unacked(_unacked), ops(unacked) 
-{
-    //populate the ops
-    acked.for_each(ops);
+void TxAccept::each(boost::function<void(DeliveryRecord&)> f) {
+    for(AckRanges::iterator i = ranges.begin(); i != ranges.end(); ++i)
+        for_each(i->start, i->end, f);
 }
 
 bool TxAccept::prepare(TransactionContext* ctxt) throw()
 {
     try{
-        ops.prepare(ctxt);
+        each(bind(&DeliveryRecord::dequeue, _1, ctxt));
         return true;
     }catch(const std::exception& e){
         QPID_LOG(error, "Failed to prepare: " << e.what());
@@ -86,10 +57,19 @@ bool TxAccept::prepare(TransactionContex
     }
 }
 
-void TxAccept::commit() throw() 
+void TxAccept::commit() throw()
 {
     try {
-        ops.commit();
+        each(bind(&DeliveryRecord::committed, _1));
+        each(bind(&DeliveryRecord::setEnded, _1));
+        //now remove if isRedundant():
+        if (!ranges.empty()) {
+            DeliveryRecords::iterator begin = ranges.front().start;
+            DeliveryRecords::iterator end = ranges.back().end;
+            DeliveryRecords::iterator removed =
+                remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant));
+            unacked.erase(removed, end);
+        }
     } catch (const std::exception& e) {
         QPID_LOG(error, "Failed to commit: " << e.what());
     } catch(...) {
@@ -98,3 +78,13 @@ void TxAccept::commit() throw() 
 }
 
 void TxAccept::rollback() throw() {}
+
+namespace {
+void callObserverDR(boost::shared_ptr<TransactionObserver> observer, DeliveryRecord& dr) {
+    observer->dequeue(dr.getQueue(), dr.getMessageId(), dr.getReplicationId());
+}
+} // namespace
+
+void TxAccept::callObserver(const ObserverPtr& observer) {
+    each(boost::bind(&callObserverDR, observer, _1));
+}

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.h Fri Sep 20 18:59:30 2013
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,58 +21,45 @@
 #ifndef _TxAccept_
 #define _TxAccept_
 
-#include <algorithm>
-#include <functional>
-#include <list>
 #include "qpid/framing/SequenceSet.h"
+#include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/DeliveryRecord.h"
 #include "qpid/broker/TxOp.h"
+#include <boost/function.hpp>
+#include <algorithm>
+#include <functional>
+#include <list>
 
 namespace qpid {
-    namespace broker {
-        /**
-         * Defines the transactional behaviour for accepts received by
-         * a transactional channel.
-         */
-        class TxAccept : public TxOp {
-            struct RangeOp
-            {
-                AckRange range;
-    
-                RangeOp(const AckRange& r);
-                void prepare(TransactionContext* ctxt);
-                void commit();
-            };
-
-            struct RangeOps
-            {
-                std::vector<RangeOp> ranges;
-                DeliveryRecords& unacked;
-    
-                RangeOps(DeliveryRecords& u);
-
-                void operator()(framing::SequenceNumber start, framing::SequenceNumber end);
-                void prepare(TransactionContext* ctxt);
-                void commit();    
-            };
-
-            framing::SequenceSet acked;
-            DeliveryRecords& unacked;
-            RangeOps ops;
-
-        public:
-            /**
-             * @param acked a representation of the accumulation of
-             * acks received
-             * @param unacked the record of delivered messages
-             */
-            TxAccept(const framing::SequenceSet& acked, DeliveryRecords& unacked);
-            virtual bool prepare(TransactionContext* ctxt) throw();
-            virtual void commit() throw();
-            virtual void rollback() throw();
-            virtual ~TxAccept(){}
-        };
-    }
+namespace broker {
+/**
+ * Defines the transactional behaviour for accepts received by
+ * a transactional channel.
+ */
+class TxAccept : public TxOp {
+    typedef std::vector<AckRange> AckRanges;
+    typedef boost::shared_ptr<TransactionObserver> ObserverPtr;
+
+    void each(boost::function<void(DeliveryRecord&)>);
+
+    framing::SequenceSet acked;
+    DeliveryRecords& unacked;
+    AckRanges ranges;
+
+  public:
+    /**
+     * @param acked a representation of the accumulation of
+     * acks received
+     * @param unacked the record of delivered messages
+     */
+    QPID_BROKER_EXTERN TxAccept(const framing::SequenceSet& acked, DeliveryRecords& unacked);
+    virtual bool prepare(TransactionContext* ctxt) throw();
+    virtual void commit() throw();
+    virtual void rollback() throw();
+    virtual void callObserver(const ObserverPtr&);
+    virtual ~TxAccept(){}
+};
+}
 }
 
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.cpp Fri Sep 20 18:59:30 2013
@@ -19,6 +19,7 @@
  *
  */
 #include "qpid/broker/TxBuffer.h"
+#include "qpid/broker/TransactionObserver.h"
 #include "qpid/log/Statement.h"
 
 #include <boost/mem_fn.hpp>
@@ -26,8 +27,11 @@
 using boost::mem_fn;
 using namespace qpid::broker;
 
+TxBuffer::TxBuffer() : observer(new NullTransactionObserver) {}
+
 bool TxBuffer::prepare(TransactionContext* const ctxt)
 {
+    if (!observer->prepare()) return false;
     for(op_iterator i = ops.begin(); i != ops.end(); i++){
         if(!(*i)->prepare(ctxt)){
             return false;
@@ -38,18 +42,21 @@ bool TxBuffer::prepare(TransactionContex
 
 void TxBuffer::commit()
 {
+    observer->commit();
     std::for_each(ops.begin(), ops.end(), mem_fn(&TxOp::commit));
     ops.clear();
 }
 
 void TxBuffer::rollback()
 {
+    observer->rollback();
     std::for_each(ops.begin(), ops.end(), mem_fn(&TxOp::rollback));
     ops.clear();
 }
 
 void TxBuffer::enlist(TxOp::shared_ptr op)
 {
+    op->callObserver(observer);
     ops.push_back(op);
 }
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.h Fri Sep 20 18:59:30 2013
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,21 +34,21 @@
  * transaction. This work can be committed or rolled back. Committing
  * is a two-stage process: first all the operations should be
  * prepared, then if that succeeds they can be committed.
- * 
+ *
  * In the 2pc case, a successful prepare may be followed by either a
  * commit or a rollback.
- * 
+ *
  * Atomicity of prepare is ensured by using a lower level
  * transactional facility. This saves explicitly rolling back all the
  * successfully prepared ops when one of them fails. i.e. we do not
  * use 2pc internally, we instead ensure that prepare is atomic at a
  * lower level. This makes individual prepare operations easier to
  * code.
- * 
+ *
  * Transactions on a messaging broker effect three types of 'action':
  * (1) updates to persistent storage (2) updates to transient storage
  * or cached data (3) network writes.
- * 
+ *
  * Of these, (1) should always occur atomically during prepare to
  * ensure that if the broker crashes while a transaction is being
  * completed the persistent state (which is all that then remains) is
@@ -58,59 +58,74 @@
  * TransactionalStore in use.
  */
 namespace qpid {
-    namespace broker {
-        class TxBuffer{
-            typedef std::vector<TxOp::shared_ptr>::iterator op_iterator;
-            std::vector<TxOp::shared_ptr> ops;
-        protected:
-
-        public:
-            typedef boost::shared_ptr<TxBuffer> shared_ptr;
-            /**
-             * Adds an operation to the transaction.
-             */
-            QPID_BROKER_EXTERN void enlist(TxOp::shared_ptr op);
-
-            /**
-             * Requests that all ops are prepared. This should
-             * primarily involve making sure that a persistent record
-             * of the operations is stored where necessary.
-             *
-             * Once prepared, a transaction can be committed (or in
-             * the 2pc case, rolled back).
-             * 
-             * @returns true if all the operations prepared
-             * successfully, false if not.
-             */
-            QPID_BROKER_EXTERN bool prepare(TransactionContext* const ctxt);
-
-            /**
-             * Signals that the ops all prepared successfully and can
-             * now commit, i.e. the operation can now be fully carried
-             * out.
-             * 
-             * Should only be called after a call to prepare() returns
-             * true.
-             */
-            QPID_BROKER_EXTERN void commit();
-
-            /**
-             * Signals that all ops can be rolled back.
-             * 
-             * Should only be called either after a call to prepare()
-             * returns true (2pc) or instead of a prepare call
-             * ('server-local')
-             */
-            QPID_BROKER_EXTERN void rollback();
-
-            /**
-             * Helper method for managing the process of server local
-             * commit
-             */
-            QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store);
-        };
+
+namespace broker {
+class TransactionObserver;
+
+class TxBuffer {
+ private:
+    typedef std::vector<TxOp::shared_ptr>::iterator op_iterator;
+    std::vector<TxOp::shared_ptr> ops;
+    boost::shared_ptr<TransactionObserver> observer;
+
+ public:
+    typedef boost::shared_ptr<TxBuffer> shared_ptr;
+
+    QPID_BROKER_EXTERN TxBuffer();
+
+    /**
+     * Adds an operation to the transaction.
+     */
+    QPID_BROKER_EXTERN void enlist(TxOp::shared_ptr op);
+
+    /**
+     * Requests that all ops are prepared. This should
+     * primarily involve making sure that a persistent record
+     * of the operations is stored where necessary.
+     *
+     * Once prepared, a transaction can be committed (or in
+     * the 2pc case, rolled back).
+     *
+     * @returns true if all the operations prepared
+     * successfully, false if not.
+     */
+    QPID_BROKER_EXTERN bool prepare(TransactionContext* const ctxt);
+
+    /**
+     * Signals that the ops all prepared successfully and can
+     * now commit, i.e. the operation can now be fully carried
+     * out.
+     *
+     * Should only be called after a call to prepare() returns
+     * true.
+     */
+    QPID_BROKER_EXTERN void commit();
+
+    /**
+     * Signals that all ops can be rolled back.
+     *
+     * Should only be called either after a call to prepare()
+     * returns true (2pc) or instead of a prepare call
+     * ('server-local')
+     */
+    QPID_BROKER_EXTERN void rollback();
+
+    /**
+     * Helper method for managing the process of server local
+     * commit
+     */
+    QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store);
+
+
+    QPID_BROKER_EXTERN void setObserver(boost::shared_ptr<TransactionObserver> o) {
+        observer = o;
+    }
+
+    QPID_BROKER_EXTERN boost::shared_ptr<TransactionObserver> getObserver() const {
+        return observer;
     }
-}
+};
 
+}} // namespace qpid::broker
 
 #endif

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxOp.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxOp.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxOp.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxOp.h Fri Sep 20 18:59:30 2013
@@ -25,17 +25,19 @@
 #include <boost/shared_ptr.hpp>
 
 namespace qpid {
-    namespace broker {
+namespace broker {
+class TransactionObserver;
 
-        class TxOp{
-        public:
-            typedef boost::shared_ptr<TxOp> shared_ptr;
-
-            virtual bool prepare(TransactionContext*) throw() = 0;
-            virtual void commit()  throw() = 0;
-            virtual void rollback()  throw() = 0;
-            virtual ~TxOp(){}
-        };
+class TxOp{
+  public:
+    typedef boost::shared_ptr<TxOp> shared_ptr;
+
+    virtual bool prepare(TransactionContext*) throw() = 0;
+    virtual void commit()  throw() = 0;
+    virtual void rollback()  throw() = 0;
+    virtual void callObserver(const boost::shared_ptr<TransactionObserver>&) = 0;
+    virtual ~TxOp(){}
+};
 
 }} // namespace qpid::broker
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.cpp Fri Sep 20 18:59:30 2013
@@ -43,6 +43,11 @@ Vhost::Vhost (qpid::management::Manageab
     }
 }
 
+Vhost::~Vhost () {
+    if (mgmtObject != 0)
+        mgmtObject->debugStats("destroying");
+}
+
 void Vhost::setFederationTag(const std::string& tag)
 {
     mgmtObject->set_federationTag(tag);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.h Fri Sep 20 18:59:30 2013
@@ -40,6 +40,8 @@ class Vhost : public management::Managea
 
     Vhost (management::Manageable* parentBroker, Broker* broker = 0);
 
+    ~Vhost ();
+
     management::ManagementObject::shared_ptr GetManagementObject (void) const
     { return mgmtObject; }
     void setFederationTag(const std::string& tag);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Fri Sep 20 18:59:30 2013
@@ -182,12 +182,9 @@ void Connection::open()
 
 void Connection::readPeerProperties()
 {
-    /**
-     * TODO: enable when proton 0.5 has been released:
     qpid::types::Variant::Map properties;
     DataReader::read(pn_connection_remote_properties(connection), properties);
     setPeerProperties(properties);
-    */
 }
 
 void Connection::closed()

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp Fri Sep 20 18:59:30 2013
@@ -160,7 +160,7 @@ void DataReader::readArray(pn_data_t* /*
 void DataReader::readList(pn_data_t* data, const qpid::amqp::Descriptor* descriptor)
 {
     size_t count = pn_data_get_list(data);
-    bool skip = reader.onStartList(count, qpid::amqp::CharSequence(), descriptor);
+    bool skip = reader.onStartList(count, qpid::amqp::CharSequence(), qpid::amqp::CharSequence(), descriptor);
     if (!skip) {
         pn_data_enter(data);
         for (size_t i = 0; i < count && pn_data_next(data); ++i) {
@@ -174,7 +174,7 @@ void DataReader::readList(pn_data_t* dat
 void DataReader::readMap(pn_data_t* data, const qpid::amqp::Descriptor* descriptor)
 {
     size_t count = pn_data_get_map(data);
-    reader.onStartMap(count, qpid::amqp::CharSequence(), descriptor);
+    reader.onStartMap(count, qpid::amqp::CharSequence(), qpid::amqp::CharSequence(), descriptor);
     pn_data_enter(data);
     for (size_t i = 0; i < count && pn_data_next(data); ++i) {
         read(data);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp Fri Sep 20 18:59:30 2013
@@ -25,6 +25,7 @@
 #include "qpid/amqp/descriptors.h"
 #include "qpid/broker/AsyncCompletion.h"
 #include "qpid/broker/Message.h"
+#include "qpid/broker/Broker.h"
 
 namespace qpid {
 namespace broker {
@@ -104,7 +105,7 @@ namespace {
 }
 
 DecodingIncoming::DecodingIncoming(pn_link_t* link, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name)
-    : Incoming(link, broker, parent, source, target, name), session(parent.shared_from_this()) {}
+    : Incoming(link, broker, parent, source, target, name), session(parent.shared_from_this()), expiryPolicy(broker.getExpiryPolicy()) {}
 DecodingIncoming::~DecodingIncoming() {}
 
 void DecodingIncoming::readable(pn_delivery_t* delivery)
@@ -116,6 +117,7 @@ void DecodingIncoming::readable(pn_deliv
 
     qpid::broker::Message message(received, received);
     userid.verify(message.getUserId());
+    message.computeExpiration(expiryPolicy);
     handle(message);
     --window;
     received->begin();

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.h Fri Sep 20 18:59:30 2013
@@ -77,6 +77,7 @@ class DecodingIncoming : public Incoming
     virtual void handle(qpid::broker::Message&) = 0;
   private:
     boost::shared_ptr<Session> session;
+    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
 };
 
 }}} // namespace qpid::broker::amqp

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.cpp Fri Sep 20 18:59:30 2013
@@ -21,9 +21,13 @@
 #include "Message.h"
 #include "qpid/amqp/Decoder.h"
 #include "qpid/amqp/descriptors.h"
-#include "qpid/amqp/Reader.h"
-#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/amqp/ListBuilder.h"
+#include "qpid/amqp/MapBuilder.h"
 #include "qpid/amqp/MapHandler.h"
+#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/amqp/Reader.h"
+#include "qpid/amqp/typecodes.h"
+#include "qpid/types/encodings.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/Buffer.h"
 #include <string.h>
@@ -121,8 +125,8 @@ namespace {
             }
         }
 
-        bool onStartList(uint32_t, const CharSequence&, const Descriptor*) { return false; }
-        bool onStartMap(uint32_t, const CharSequence&, const Descriptor*) { return false; }
+        bool onStartList(uint32_t, const CharSequence&, const CharSequence&, const Descriptor*) { return false; }
+        bool onStartMap(uint32_t, const CharSequence&, const CharSequence&, const Descriptor*) { return false; }
         bool onStartArray(uint32_t, const CharSequence&, const Constructor&, const Descriptor*) { return false; }
 
     public:
@@ -144,8 +148,12 @@ void Message::processProperties(MapHandl
 //and whether it should indeed only be the content that is thus
 //measured
 uint64_t Message::getContentSize() const { return data.size(); }
-//getContent() is used primarily for decoding qmf messages in management and ha
-std::string Message::getContent() const { return empty; }
+//getContent() is used primarily for decoding qmf messages in
+//management and ha, but also by the xml exchange
+std::string Message::getContent() const
+{
+    return std::string(body.data, body.size);
+}
 
 Message::Message(size_t size) : data(size)
 {
@@ -253,12 +261,54 @@ void Message::onGroupId(const qpid::amqp
 void Message::onGroupSequence(uint32_t) {}
 void Message::onReplyToGroupId(const qpid::amqp::CharSequence&) {}
 
-void Message::onApplicationProperties(const qpid::amqp::CharSequence& v) { applicationProperties = v; }
-void Message::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { deliveryAnnotations = v; }
-void Message::onMessageAnnotations(const qpid::amqp::CharSequence& v) { messageAnnotations = v; }
-void Message::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&) { body = v; }
-void Message::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {}
-void Message::onFooter(const qpid::amqp::CharSequence& v) { footer = v; }
+void Message::onApplicationProperties(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { applicationProperties = v; }
+void Message::onDeliveryAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence& v) { deliveryAnnotations = v; }
+void Message::onMessageAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence& v) { messageAnnotations = v; }
+
+void Message::onData(const qpid::amqp::CharSequence& v) { body = v; }
+void Message::onAmqpSequence(const qpid::amqp::CharSequence& v) { body = v; bodyType = qpid::amqp::typecodes::LIST_NAME; }
+void Message::onAmqpValue(const qpid::amqp::CharSequence& v, const std::string& t)
+{
+    body = v;
+    if (t == qpid::amqp::typecodes::STRING_NAME) {
+        bodyType = qpid::types::encodings::UTF8;
+    } else if (t == qpid::amqp::typecodes::SYMBOL_NAME) {
+        bodyType = qpid::types::encodings::ASCII;
+    } else if (t == qpid::amqp::typecodes::BINARY_NAME) {
+        bodyType = qpid::types::encodings::BINARY;
+    } else {
+        bodyType = t;
+    }
+}
+void Message::onAmqpValue(const qpid::types::Variant& v) { typedBody = v; }
+
+void Message::onFooter(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence& v) { footer = v; }
+
+bool Message::isTypedBody() const
+{
+    return !typedBody.isVoid() || !bodyType.empty();
+}
+
+qpid::types::Variant Message::getTypedBody() const
+{
+    if (bodyType == qpid::amqp::typecodes::LIST_NAME) {
+        qpid::amqp::ListBuilder builder;
+        qpid::amqp::Decoder decoder(body.data, body.size);
+        decoder.read(builder);
+        return builder.getList();
+    } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) {
+        qpid::amqp::MapBuilder builder;
+        qpid::amqp::Decoder decoder(body.data, body.size);
+        decoder.read(builder);
+        return builder.getMap();
+    } else if (!bodyType.empty()) {
+        qpid::types::Variant value(std::string(body.data, body.size));
+        value.setEncoding(bodyType);
+        return value;
+    } else {
+        return typedBody;
+    }
+}
 
 
 //PersistableMessage interface:
@@ -292,31 +342,41 @@ void Message::decodeHeader(framing::Buff
 }
 void Message::decodeContent(framing::Buffer& /*buffer*/) {}
 
-boost::intrusive_ptr<PersistableMessage> Message::merge(const std::map<std::string, qpid::types::Variant>& annotations) const
+boost::intrusive_ptr<PersistableMessage> Message::merge(const std::map<std::string, qpid::types::Variant>& added) const
 {
     //message- or delivery- annotations? would have to determine that from the name, for now assume always message-annotations
-    size_t extra = 0;
+    std::map<std::string, qpid::types::Variant> combined;
+    const std::map<std::string, qpid::types::Variant>* annotations(0);
     if (messageAnnotations) {
-        //TODO: actual merge required
+        //combine existing and added annotations (TODO: this could be
+        //optimised by avoiding the decode and simply 'editing' the
+        //size and count in the raw data, then appending the new
+        //elements).
+        qpid::amqp::MapBuilder builder;
+        qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size);
+        decoder.read(builder);
+        combined = builder.getMap();
+        for (std::map<std::string, qpid::types::Variant>::const_iterator i = added.begin(); i != added.end(); ++i) {
+            combined[i->first] = i->second;
+        }
+        annotations = &combined;
     } else {
-        //add whole new section
-        extra = qpid::amqp::MessageEncoder::getEncodedSize(annotations, true);
+        //additions form a whole new section
+        annotations = &added;
     }
-    boost::intrusive_ptr<Message> copy(new Message(data.size()+extra));
+    size_t annotationsSize = qpid::amqp::MessageEncoder::getEncodedSize(*annotations, true) + 3/*descriptor*/;
+
+    boost::intrusive_ptr<Message> copy(new Message(bareMessage.size+footer.size+deliveryAnnotations.size+annotationsSize));
     size_t position(0);
-    if (deliveryAnnotations) {
+    if (deliveryAnnotations.size) {
         ::memcpy(&copy->data[position], deliveryAnnotations.data, deliveryAnnotations.size);
         position += deliveryAnnotations.size;
     }
-    if (messageAnnotations) {
-        //TODO: actual merge required
-        ::memcpy(&copy->data[position], messageAnnotations.data, messageAnnotations.size);
-        position += messageAnnotations.size;
-    } else {
-        qpid::amqp::MessageEncoder encoder(&copy->data[position], extra);
-        encoder.writeMap(annotations, &qpid::amqp::message::MESSAGE_ANNOTATIONS, true);
-        position += extra;
-    }
+
+    qpid::amqp::Encoder encoder(&copy->data[position], annotationsSize);
+    encoder.writeMap(*annotations, &qpid::amqp::message::MESSAGE_ANNOTATIONS, true);
+    position += encoder.getPosition();
+
     if (bareMessage) {
         ::memcpy(&copy->data[position], bareMessage.data, bareMessage.size);
         position += bareMessage.size;
@@ -325,7 +385,18 @@ boost::intrusive_ptr<PersistableMessage>
         ::memcpy(&copy->data[position], footer.data, footer.size);
         position += footer.size;
     }
+    copy->data.resize(position);//annotationsSize may be slightly bigger than needed if optimisations are used (e.g. smallint)
     copy->scan();
+    {
+        qpid::amqp::MapBuilder builder;
+        qpid::amqp::Decoder decoder(copy->messageAnnotations.data, copy->messageAnnotations.size);
+        decoder.read(builder);
+        QPID_LOG(notice, "Merged annotations are now: " << builder.getMap() << " raw=" << std::hex << std::string(copy->messageAnnotations.data, copy->messageAnnotations.size) << " " << copy->messageAnnotations.size << " bytes");
+    }
+    assert(copy->messageAnnotations);
+    assert(copy->bareMessage.size == bareMessage.size);
+    assert(copy->footer.size == footer.size);
+    assert(copy->deliveryAnnotations.size == deliveryAnnotations.size);
     return copy;
 }
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.h Fri Sep 20 18:59:30 2013
@@ -64,6 +64,8 @@ class Message : public qpid::broker::Mes
     qpid::amqp::CharSequence getBareMessage() const;
     qpid::amqp::CharSequence getBody() const;
     qpid::amqp::CharSequence getFooter() const;
+    bool isTypedBody() const;
+    qpid::types::Variant getTypedBody() const;
 
     Message(size_t size);
     char* getData();
@@ -109,6 +111,8 @@ class Message : public qpid::broker::Mes
 
     //body:
     qpid::amqp::CharSequence body;
+    qpid::types::Variant typedBody;
+    std::string bodyType;
 
     //footer:
     qpid::amqp::CharSequence footer;
@@ -136,12 +140,16 @@ class Message : public qpid::broker::Mes
     void onGroupSequence(uint32_t);
     void onReplyToGroupId(const qpid::amqp::CharSequence&);
 
-    void onApplicationProperties(const qpid::amqp::CharSequence&);
-    void onDeliveryAnnotations(const qpid::amqp::CharSequence&);
-    void onMessageAnnotations(const qpid::amqp::CharSequence&);
-    void onBody(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor&);
-    void onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&);
-    void onFooter(const qpid::amqp::CharSequence&);
+    void onApplicationProperties(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
+    void onDeliveryAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
+    void onMessageAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
+
+    void onData(const qpid::amqp::CharSequence&);
+    void onAmqpSequence(const qpid::amqp::CharSequence&);
+    void onAmqpValue(const qpid::amqp::CharSequence&, const std::string& type);
+    void onAmqpValue(const qpid::types::Variant&);
+
+    void onFooter(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Fri Sep 20 18:59:30 2013
@@ -29,6 +29,7 @@
 #include "qpid/sys/OutputControl.h"
 #include "qpid/amqp/descriptors.h"
 #include "qpid/amqp/MessageEncoder.h"
+#include "qpid/framing/Buffer.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
 
@@ -44,14 +45,16 @@ void Outgoing::wakeup()
     session.wakeup();
 }
 
-OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool e, bool p)
+OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session,
+                                     qpid::sys::OutputControl& o, SubscriptionType type, bool e, bool p)
     : Outgoing(broker, session, source, target, pn_link_name(l)),
-      Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
+      Consumer(pn_link_name(l), type),
       exclusive(e),
       isControllingUser(p),
       queue(q), deliveries(5000), link(l), out(o),
       current(0), outstanding(0),
-      buffer(1024)/*used only for header at present*/
+      buffer(1024)/*used only for header at present*/,
+      unreliable(pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED)
 {
     for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
         deliveries[i].init(i);
@@ -91,8 +94,7 @@ void OutgoingFromQueue::write(const char
 
 void OutgoingFromQueue::handle(pn_delivery_t* delivery)
 {
-    pn_delivery_tag_t tag = pn_delivery_tag(delivery);
-    size_t i = *reinterpret_cast<const size_t*>(tag.bytes);
+    size_t i = Record::getIndex(pn_delivery_tag(delivery));
     Record& r = deliveries[i];
     if (pn_delivery_writable(delivery)) {
         assert(r.msg);
@@ -104,6 +106,7 @@ void OutgoingFromQueue::handle(pn_delive
         write(&buffer[0], encoder.getPosition());
         Translation t(r.msg);
         t.write(*this);
+        if (unreliable) pn_delivery_settle(delivery);
         if (pn_link_advance(link)) {
             --outstanding;
             outgoingMessageSent();
@@ -112,26 +115,28 @@ void OutgoingFromQueue::handle(pn_delive
             QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
         }
     }
-    if (pn_delivery_updated(delivery)) {
+    if (unreliable) {
+        if (preAcquires()) queue->dequeue(0, r.cursor);
+        r.reset();
+    } else if (pn_delivery_updated(delivery)) {
         assert(r.delivery == delivery);
         r.disposition = pn_delivery_remote_state(delivery);
         if (r.disposition) {
             switch (r.disposition) {
               case PN_ACCEPTED:
-                //TODO: only if consuming
-                queue->dequeue(0, r.cursor);
+                if (preAcquires()) queue->dequeue(0, r.cursor);
                 outgoingMessageAccepted();
                 break;
               case PN_REJECTED:
-                queue->reject(r.cursor);
+                if (preAcquires()) queue->reject(r.cursor);
                 outgoingMessageRejected();
                 break;
               case PN_RELEASED:
-                queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented
+                if (preAcquires()) queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented
                 outgoingMessageRejected();//TODO: not quite true...
                 break;
               case PN_MODIFIED:
-                queue->release(r.cursor, true);//TODO: proper handling of modified
+                if (preAcquires()) queue->release(r.cursor, true);//TODO: proper handling of modified
                 outgoingMessageRejected();//TODO: not quite true...
                 break;
               default:
@@ -251,12 +256,17 @@ qpid::broker::OwnershipToken* OutgoingFr
     return 0;
 }
 
-OutgoingFromQueue::Record::Record() : delivery(0), disposition(0), index(0) {}
+OutgoingFromQueue::Record::Record() : delivery(0), disposition(0), index(0)
+{
+    tag.bytes = tagData;
+    tag.size = TAG_WIDTH;
+}
 void OutgoingFromQueue::Record::init(size_t i)
 {
     index = i;
-    tag.bytes = reinterpret_cast<const char*>(&index);
-    tag.size = sizeof(index);
+    qpid::framing::Buffer buffer(tagData, tag.size);
+    assert(index <= std::numeric_limits<uint32_t>::max());
+    buffer.putLong(index);
 }
 void OutgoingFromQueue::Record::reset()
 {
@@ -266,5 +276,13 @@ void OutgoingFromQueue::Record::reset()
     disposition = 0;
 }
 
+size_t OutgoingFromQueue::Record::getIndex(pn_delivery_tag_t t)
+{
+    assert(t.size == TAG_WIDTH);
+    qpid::framing::Buffer buffer(const_cast<char*>(t.bytes)/*won't ever be written to*/, t.size);
+    return (size_t) buffer.getLong();
+}
+
+
 
 }}} // namespace qpid::broker::amqp

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Fri Sep 20 18:59:30 2013
@@ -88,7 +88,8 @@ class Outgoing : public ManagedOutgoingL
 class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue>
 {
   public:
-    OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool exclusive, bool isControllingUser);
+    OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&,
+                      qpid::sys::OutputControl& o, SubscriptionType type, bool exclusive, bool isControllingUser);
     void setSubjectFilter(const std::string&);
     void setSelectorFilter(const std::string&);
     void init();
@@ -117,10 +118,17 @@ class OutgoingFromQueue : public Outgoin
         int disposition;
         size_t index;
         pn_delivery_tag_t tag;
+        //The delivery tag is a 4 byte value representing the
+        //index. It is encoded separately to avoid alignment issues.
+        //The number of deliveries held here is always strictly
+        //bounded, so 4 bytes is more than enough.
+        static const size_t TAG_WIDTH = sizeof(uint32_t);
+        char tagData[TAG_WIDTH];
 
         Record();
         void init(size_t i);
         void reset();
+        static size_t getIndex(pn_delivery_tag_t);
     };
 
     const bool exclusive;
@@ -134,6 +142,7 @@ class OutgoingFromQueue : public Outgoin
     std::vector<char> buffer;
     std::string subjectFilter;
     boost::scoped_ptr<Selector> selector;
+    bool unreliable;
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.cpp Fri Sep 20 18:59:30 2013
@@ -105,14 +105,14 @@ void Relay::detached(Outgoing*)
 {
     out = 0;
     isDetached = true;
-    std::cerr << "Outgoing link detached from relay" << std::endl;
+    QPID_LOG(info, "Outgoing link detached from relay [" << this << "]");
     if (in) in->wakeup();
 }
 void Relay::detached(Incoming*)
 {
     in = 0;
     isDetached = true;
-    std::cerr << "Incoming link detached from relay" << std::endl;
+    QPID_LOG(info, "Incoming link detached from relay [" << this << "]");
     if (out) out->wakeup();
 }
 
@@ -139,13 +139,13 @@ void OutgoingFromRelay::handle(pn_delive
     if (pn_delivery_writable(delivery)) {
         if (transfer->write(link)) {
             outgoingMessageSent();
-            QPID_LOG(debug, "Sent relayed message " << name);
+            QPID_LOG(debug, "Sent relayed message " << name << " [" << relay.get() << "]");
         } else {
-            QPID_LOG(error, "Failed to send relayed message " << name);
+            QPID_LOG(error, "Failed to send relayed message " << name << " [" << relay.get() << "]");
         }
     }
     if (pn_delivery_updated(delivery)) {
-        pn_disposition_t d = transfer->updated();
+        uint64_t d = transfer->updated();
         switch (d) {
           case PN_ACCEPTED:
             outgoingMessageAccepted();
@@ -226,6 +226,7 @@ void IncomingToRelay::detached()
     relay->detached(this);
 }
 
+BufferedTransfer::BufferedTransfer() : disposition(0) {}
 void BufferedTransfer::initIn(pn_link_t* link, pn_delivery_t* d)
 {
     in.handle = d;
@@ -264,7 +265,7 @@ void BufferedTransfer::initOut(pn_link_t
     pn_delivery_set_context(out.handle, this);
 }
 
-pn_disposition_t BufferedTransfer::updated()
+uint64_t BufferedTransfer::updated()
 {
     disposition = pn_delivery_remote_state(out.handle);
     if (disposition) {

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.h Fri Sep 20 18:59:30 2013
@@ -45,10 +45,11 @@ struct Delivery
 class BufferedTransfer
 {
   public:
+    BufferedTransfer();
     void initIn(pn_link_t* link, pn_delivery_t* d);
     bool settle();
     void initOut(pn_link_t* link);
-    pn_disposition_t updated();
+    uint64_t updated();
     bool write(pn_link_t*);
   private:
     std::vector<char> data;
@@ -56,7 +57,7 @@ class BufferedTransfer
     Delivery out;
     pn_delivery_tag_t dt;
     std::vector<char> tag;
-    pn_disposition_t disposition;
+    uint64_t disposition;
 };
 
 /**

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp Fri Sep 20 18:59:30 2013
@@ -36,7 +36,7 @@ namespace amqp {
 SaslClient::SaslClient(qpid::sys::OutputControl& out_, const std::string& id, boost::shared_ptr<Interconnect> c, std::auto_ptr<qpid::Sasl> s,
                        const std::string& hostname_, const std::string& mechs, const qpid::sys::SecuritySettings& t)
     : qpid::amqp::SaslClient(id), out(out_), connection(c), sasl(s),
-      hostname(hostname_), allowedMechanisms(mechs), transport(t), readHeader(true), writeHeader(false), haveOutput(false), state(NONE) {}
+      hostname(hostname_), allowedMechanisms(mechs), transport(t), readHeader(true), writeHeader(false), haveOutput(false), initialised(false), state(NONE) {}
 
 SaslClient::~SaslClient()
 {
@@ -67,8 +67,10 @@ std::size_t SaslClient::encode(char* buf
          encoded += writeProtocolHeader(buffer, size);
          writeHeader = !encoded;
     }
-    if (state == NONE && encoded < size) {
-        encoded += write(buffer + encoded, size - encoded);
+    if ((!initialised || state == NONE) && encoded < size) {
+        size_t extra = write(buffer + encoded, size - encoded);
+        encoded += extra;
+        initialised = extra > 0;
     } else if (state == SUCCEEDED) {
         if (securityLayer.get()) encoded += securityLayer->encode(buffer + encoded, size - encoded);
         else encoded += connection->encode(buffer + encoded, size - encoded);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.h Fri Sep 20 18:59:30 2013
@@ -64,6 +64,7 @@ class SaslClient : public qpid::sys::Con
     bool readHeader;
     bool writeHeader;
     bool haveOutput;
+    bool initialised;
     enum {
         NONE, FAILED, SUCCEEDED
     } state;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.cpp Fri Sep 20 18:59:30 2013
@@ -36,6 +36,7 @@
 #include "qpid/broker/TopicExchange.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueCursor.h"
 #include "qpid/broker/Selector.h"
 #include "qpid/broker/TopicExchange.h"
 #include "qpid/broker/amqp/Filter.h"
@@ -56,15 +57,16 @@ namespace broker {
 namespace amqp {
 
 namespace {
-bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
+pn_bytes_t convert(const std::string& s)
 {
-    pn_data_rewind(capabilities);
-    while (pn_data_next(capabilities)) {
-        pn_bytes_t c = pn_data_get_symbol(capabilities);
-        std::string s(c.start, c.size);
-        if (s == name) return true;
-    }
-    return false;
+    pn_bytes_t result;
+    result.start = const_cast<char*>(s.data());
+    result.size = s.size();
+    return result;
+}
+std::string convert(pn_bytes_t in)
+{
+    return std::string(in.start, in.size);
 }
 //capabilities
 const std::string CREATE_ON_DEMAND("create-on-demand");
@@ -75,40 +77,90 @@ const std::string DIRECT_FILTER("legacy-
 const std::string TOPIC_FILTER("legacy-amqp-topic-binding");
 const std::string SHARED("shared");
 
-void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
+void writeCapabilities(pn_data_t* out, const std::vector<std::string>& supported)
 {
-    pn_data_rewind(in);
-    while (pn_data_next(in)) {
-        pn_bytes_t c = pn_data_get_symbol(in);
-        std::string s(c.start, c.size);
-        if (s == DURABLE) {
-            if (node->isDurable()) pn_data_put_symbol(out, c);
-        } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) {
-            pn_data_put_symbol(out, c);
+    if (supported.size() == 1) {
+        pn_data_put_symbol(out, convert(supported.front()));
+    } else if (supported.size() > 1) {
+        pn_data_put_array(out, false, PN_SYMBOL);
+        pn_data_enter(out);
+        for (std::vector<std::string>::const_iterator i = supported.begin(); i != supported.end(); ++i) {
+            pn_data_put_symbol(out, convert(*i));
+        }
+        pn_data_exit(out);
+    }
+}
+
+template <class F>
+void readCapabilities(pn_data_t* data, F f)
+{
+    pn_data_rewind(data);
+    if (pn_data_next(data)) {
+        pn_type_t type = pn_data_type(data);
+        if (type == PN_ARRAY) {
+            pn_data_enter(data);
+            while (pn_data_next(data)) {
+                f(convert(pn_data_get_symbol(data)));
+            }
+            pn_data_exit(data);
+        } else if (type == PN_SYMBOL) {
+            f(convert(pn_data_get_symbol(data)));
+        } else {
+            QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type));
         }
     }
 }
 
-void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
+void matchCapability(const std::string& name, bool* result, const std::string& s)
 {
-    pn_data_rewind(in);
-    while (pn_data_next(in)) {
-        pn_bytes_t c = pn_data_get_symbol(in);
-        std::string s(c.start, c.size);
-        if (s == DURABLE) {
-            if (node->isDurable()) pn_data_put_symbol(out, c);
-        } else if (s == SHARED) {
-            pn_data_put_symbol(out, c);
-        } else if (s == CREATE_ON_DEMAND || s == TOPIC) {
-            pn_data_put_symbol(out, c);
-        } else if (s == DIRECT_FILTER) {
-            if (node->getType() == DirectExchange::typeName) pn_data_put_symbol(out, c);
-        } else if (s == TOPIC_FILTER) {
-            if (node->getType() == TopicExchange::typeName) pn_data_put_symbol(out, c);
-        }
+    if (s == name) *result = true;
+}
+
+bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
+{
+    bool result(false);
+    readCapabilities(capabilities, boost::bind(&matchCapability, name, &result, _1));
+    return result;
+}
+
+void collectQueueCapabilities(boost::shared_ptr<Queue> node, std::vector<std::string>* supported, const std::string& s)
+{
+    if (s == DURABLE) {
+        if (node->isDurable()) supported->push_back(s);
+    } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) {
+        supported->push_back(s);
     }
 }
 
+void collectExchangeCapabilities(boost::shared_ptr<Exchange> node, std::vector<std::string>* supported, const std::string& s)
+{
+    if (s == DURABLE) {
+        if (node->isDurable()) supported->push_back(s);
+    } else if (s == SHARED) {
+        supported->push_back(s);
+    } else if (s == CREATE_ON_DEMAND || s == TOPIC) {
+        supported->push_back(s);
+    } else if (s == DIRECT_FILTER) {
+        if (node->getType() == DirectExchange::typeName) supported->push_back(s);
+    } else if (s == TOPIC_FILTER) {
+        if (node->getType() == TopicExchange::typeName) supported->push_back(s);
+    }
+}
+
+void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
+{
+    std::vector<std::string> supported;
+    readCapabilities(in, boost::bind(&collectQueueCapabilities, node, &supported, _1));
+    writeCapabilities(out, supported);
+}
+
+void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
+{
+    std::vector<std::string> supported;
+    readCapabilities(in, boost::bind(&collectExchangeCapabilities, node, &supported, _1));
+    writeCapabilities(out, supported);
+}
+
 }
 
 class IncomingToQueue : public DecodingIncoming
@@ -149,6 +201,12 @@ Session::ResolvedNode Session::resolve(c
     node.queue = connection.getBroker().getQueues().find(name);
     node.topic = connection.getTopics().get(name);
     if (node.topic) node.exchange = node.topic->getExchange();
+    if (node.exchange && !node.queue && is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
+        node.properties.read(pn_terminus_properties(terminus));
+        if (!node.properties.getExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) {
+            throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "Exchange of different type already exists");
+        }
+    }
     if (!node.queue && !node.exchange) {
         if (pn_terminus_is_dynamic(terminus)  || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
             //is it a queue or an exchange?
@@ -316,10 +374,10 @@ void Session::setupOutgoing(pn_link_t* l
         target = targetAddress;
     }
 
-
     if (node.queue) {
         authorise.outgoing(node.queue);
-        boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, false, node.properties.trackControllingLink()));
+        SubscriptionType type = pn_terminus_get_distribution_mode(source) == PN_DIST_MODE_COPY ? BROWSER : CONSUMER;
+        boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.properties.trackControllingLink()));
         q->init();
         filter.apply(q);
         outgoing[link] = q;
@@ -333,8 +391,12 @@ void Session::setupOutgoing(pn_link_t* l
             settings.durable = durable;
             settings.autodelete = !durable;
         }
+        settings.autoDeleteDelay = pn_terminus_get_timeout(source);
+        if (settings.autoDeleteDelay) {
+            settings.autodelete = true;
+            settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay;
+        }
         filter.configure(settings);
-        //TODO: populate settings from source details when available from engine
         std::stringstream queueName;
         if (shared) {
             //just use link name (TODO: could allow this to be
@@ -350,9 +412,9 @@ void Session::setupOutgoing(pn_link_t* l
         if (!shared) queue->setExclusiveOwner(this);
         authorise.outgoing(node.exchange, queue, filter);
         filter.bind(node.exchange, queue);
-        boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, !shared, false));
-        outgoing[link] = q;
+        boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, CONSUMER, !shared, false));
         q->init();
+        outgoing[link] = q;
     } else if (node.relay) {
         boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, connection.getBroker(), *this, name, target, pn_link_name(link), node.relay));
         outgoing[link] = out;
@@ -503,7 +565,6 @@ bool Session::dispatch()
 
 void Session::close()
 {
-    exclusiveQueues.clear();
     for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
         i->second->detached();
     }
@@ -516,6 +577,7 @@ void Session::close()
     for (std::set< boost::shared_ptr<Queue> >::const_iterator i = exclusiveQueues.begin(); i != exclusiveQueues.end(); ++i) {
         (*i)->releaseExclusiveOwnership();
     }
+    exclusiveQueues.clear();
     qpid::sys::Mutex::ScopedLock l(lock);
     deleted = true;
 }

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.cpp Fri Sep 20 18:59:30 2013
@@ -47,6 +47,13 @@ bool testProperty(const std::string& k, 
     else return i->second;
 }
 
+qpid::types::Variant::Map filter(const qpid::types::Variant::Map& properties)
+{
+    qpid::types::Variant::Map filtered = properties;
+    filtered.erase(DURABLE);
+    filtered.erase(EXCHANGE);
+    return filtered;
+}
 }
 
 Topic::Topic(Broker& broker, const std::string& n, const qpid::types::Variant::Map& properties)
@@ -60,7 +67,7 @@ Topic::Topic(Broker& broker, const std::
     qpid::management::ManagementAgent* agent = broker.getManagementAgent();
     if (agent != 0) {
         topic = _qmf::Topic::shared_ptr(new _qmf::Topic(agent, this, name, exchange->GetManagementObject()->getObjectId(), durable));
-        topic->set_properties(policy.asMap());
+        topic->set_properties(filter(properties));
         agent->addObject(topic);
     }
 }
@@ -117,8 +124,12 @@ bool TopicRegistry::deleteObject(Broker&
 {
     if (type == TOPIC) {
         boost::shared_ptr<Topic> topic = remove(name);
-        if (topic->isDurable()) broker.getStore().destroy(*topic);
-        return true;
+        if (topic) {
+            if (topic->isDurable()) broker.getStore().destroy(*topic);
+            return true;
+        } else {
+            return false;
+        }
     } else {
         return false;
     }

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Translation.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Translation.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Translation.cpp Fri Sep 20 18:59:30 2013
@@ -27,6 +27,7 @@
 #include "qpid/amqp/MessageEncoder.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/types/Variant.h"
+#include "qpid/types/encodings.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
 #include <boost/lexical_cast.hpp>
@@ -38,6 +39,9 @@ namespace {
 
 const std::string EMPTY;
 const std::string FORWARD_SLASH("/");
+const std::string TEXT_PLAIN("text/plain");
+const std::string SUBJECT_KEY("qpid.subject");
+const std::string APP_ID("x-amqp-0-10.app-id");
 
 qpid::framing::ReplyTo translate(const std::string address, Broker* broker)
 {
@@ -98,8 +102,25 @@ class Properties_0_10 : public qpid::amq
     std::string getUserId() const { return messageProperties ? messageProperties->getUserId() : EMPTY; }
     bool hasTo() const { return getDestination().size() || hasSubject(); }
     std::string getTo() const { return  getDestination().size() ? getDestination() : getSubject(); }
-    bool hasSubject() const { return deliveryProperties && getDestination().size() && deliveryProperties->hasRoutingKey(); }
-    std::string getSubject() const { return deliveryProperties && getDestination().size() ? deliveryProperties->getRoutingKey() : EMPTY; }
+    bool hasSubject() const
+    {
+        if (getDestination().empty()) {
+            return getApplicationProperties().isSet(SUBJECT_KEY);
+        } else {
+            return deliveryProperties && deliveryProperties->hasRoutingKey();
+        }
+    }
+    std::string getSubject() const
+    {
+        if (getDestination().empty()) {
+            //message was sent to default exchange, routing key is the queue name
+            return getApplicationProperties().getAsString(SUBJECT_KEY);
+        } else if (deliveryProperties) {
+            return deliveryProperties->getRoutingKey();
+        } else {
+            return EMPTY;
+        }
+    }
     bool hasReplyTo() const { return messageProperties && messageProperties->hasReplyTo(); }
     std::string getReplyTo() const { return messageProperties ? translate(messageProperties->getReplyTo()) : EMPTY; }
     bool hasCorrelationId() const { return messageProperties && messageProperties->hasCorrelationId(); }
@@ -119,7 +140,7 @@ class Properties_0_10 : public qpid::amq
     bool hasReplyToGroupId() const { return false; }
     std::string getReplyToGroupId() const { return EMPTY; }
 
-    const qpid::framing::FieldTable& getApplicationProperties() { return messageProperties->getApplicationHeaders(); }
+    const qpid::framing::FieldTable& getApplicationProperties() const { return messageProperties->getApplicationHeaders(); }
     Properties_0_10(const qpid::broker::amqp_0_10::MessageTransfer& t) : transfer(t),
                                                                          messageProperties(transfer.getProperties<qpid::framing::MessageProperties>()),
                                                                          deliveryProperties(transfer.getProperties<qpid::framing::DeliveryProperties>())
@@ -138,7 +159,6 @@ class Properties_0_10 : public qpid::amq
 
 Translation::Translation(const qpid::broker::Message& m, Broker* b) : original(m), broker(b) {}
 
-
 boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer()
 {
     boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> t =
@@ -161,13 +181,38 @@ boost::intrusive_ptr<const qpid::broker:
             transfer->getFrames().append(method);
             transfer->getFrames().append(header);
 
-            qpid::amqp::CharSequence body = message->getBody();
-            content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size);
-            transfer->getFrames().append(content);
-
             qpid::framing::MessageProperties* props =
                 transfer->getFrames().getHeaders()->get<qpid::framing::MessageProperties>(true);
-            props->setContentLength(body.size);
+
+            if (message->isTypedBody()) {
+                qpid::types::Variant body = message->getTypedBody();
+                std::string& data = content.castBody<qpid::framing::AMQContentBody>()->getData();
+                if (body.getType() == qpid::types::VAR_MAP) {
+                    qpid::amqp_0_10::MapCodec::encode(body.asMap(), data);
+                    props->setContentType(qpid::amqp_0_10::MapCodec::contentType);
+                } else if (body.getType() == qpid::types::VAR_LIST) {
+                    qpid::amqp_0_10::ListCodec::encode(body.asList(), data);
+                    props->setContentType(qpid::amqp_0_10::ListCodec::contentType);
+                } else if (body.getType() == qpid::types::VAR_STRING) {
+                    data = body.getString();
+                    if (body.getEncoding() == qpid::types::encodings::UTF8 || body.getEncoding() == qpid::types::encodings::ASCII) {
+                        props->setContentType(TEXT_PLAIN);
+                    }
+                } else {
+                    qpid::types::Variant::List container;
+                    container.push_back(body);
+                    qpid::amqp_0_10::ListCodec::encode(container, data);
+                    props->setContentType(qpid::amqp_0_10::ListCodec::contentType);
+                }
+                transfer->getFrames().append(content);
+                props->setContentLength(data.size());
+            } else {
+                qpid::amqp::CharSequence body = message->getBody();
+                content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size);
+                transfer->getFrames().append(content);
+
+                props->setContentLength(body.size);
+            }
 
             qpid::amqp::MessageId mid = message->getMessageId();
             qpid::framing::Uuid uuid;
@@ -209,13 +254,25 @@ boost::intrusive_ptr<const qpid::broker:
             if (ap) {
                 qpid::amqp::Decoder d(ap.data, ap.size);
                 qpid::amqp_0_10::translate(d.readMap(), props->getApplicationHeaders());
+                std::string appid = props->getApplicationHeaders().getAsString(APP_ID);
+                if (!appid.empty()) {
+                    props->setAppId(appid);
+                }
             }
 
             qpid::framing::DeliveryProperties* dp =
                 transfer->getFrames().getHeaders()->get<qpid::framing::DeliveryProperties>(true);
             dp->setPriority(message->getPriority());
             if (message->isPersistent()) dp->setDeliveryMode(2);
-            if (message->getRoutingKey().size()) dp->setRoutingKey(message->getRoutingKey());
+            if (message->getRoutingKey().size()) {
+                if (message->getRoutingKey().size() > std::numeric_limits<uint8_t>::max()) {
+                    //have to truncate routing key as it is specified to be a str8
+                    dp->setRoutingKey(message->getRoutingKey().substr(0,std::numeric_limits<uint8_t>::max()));
+                } else {
+                    dp->setRoutingKey(message->getRoutingKey());
+                }
+                props->getApplicationHeaders().setString(SUBJECT_KEY, message->getRoutingKey());
+            }
 
             return transfer.get();
         } else {
@@ -226,10 +283,11 @@ boost::intrusive_ptr<const qpid::broker:
 
 void Translation::write(OutgoingFromQueue& out)
 {
-    const Message* message = dynamic_cast<const Message*>(&original.getEncoding());
+    const Message* message = dynamic_cast<const Message*>(original.getPersistentContext().get());
+    //persistent context will contain any newly added annotations
+    if (!message) message = dynamic_cast<const Message*>(&original.getEncoding());
     if (message) {
         //write annotations
-        //TODO: merge in any newly added annotations
         qpid::amqp::CharSequence deliveryAnnotations = message->getDeliveryAnnotations();
         qpid::amqp::CharSequence messageAnnotations = message->getMessageAnnotations();
         if (deliveryAnnotations.size) out.write(deliveryAnnotations.data, deliveryAnnotations.size);
@@ -246,14 +304,40 @@ void Translation::write(OutgoingFromQueu
             Properties_0_10 properties(*transfer);
             qpid::types::Variant::Map applicationProperties;
             qpid::amqp_0_10::translate(properties.getApplicationProperties(), applicationProperties);
-            std::string content = transfer->getContent();
-            size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content);
-            std::vector<char> buffer(size);
-            qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
-            encoder.writeProperties(properties);
-            encoder.writeApplicationProperties(applicationProperties);
-            if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA);
-            out.write(&buffer[0], encoder.getPosition());
+            if (properties.getContentType() == qpid::amqp_0_10::MapCodec::contentType) {
+                qpid::types::Variant::Map content;
+                qpid::amqp_0_10::MapCodec::decode(transfer->getContent(), content);
+                size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties);
+                size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/
+                size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/;
+                std::vector<char> buffer(size);
+                qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+                encoder.writeProperties(properties);
+                encoder.writeApplicationProperties(applicationProperties);
+                encoder.writeMap(content, &qpid::amqp::message::AMQP_VALUE);
+                out.write(&buffer[0], encoder.getPosition());
+            } else if (properties.getContentType() == qpid::amqp_0_10::ListCodec::contentType) {
+                qpid::types::Variant::List content;
+                qpid::amqp_0_10::ListCodec::decode(transfer->getContent(), content);
+                size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties);
+                size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/
+                size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/;
+                std::vector<char> buffer(size);
+                qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+                encoder.writeProperties(properties);
+                encoder.writeApplicationProperties(applicationProperties);
+                encoder.writeList(content, &qpid::amqp::message::AMQP_VALUE);
+                out.write(&buffer[0], encoder.getPosition());
+            } else {
+                std::string content = transfer->getContent();
+                size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content);
+                std::vector<char> buffer(size);
+                qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+                encoder.writeProperties(properties);
+                encoder.writeApplicationProperties(applicationProperties);
+                if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA);
+                out.write(&buffer[0], encoder.getPosition());
+            }
         } else {
             QPID_LOG(error, "Could not write message data in AMQP 1.0 format");
         }

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp Fri Sep 20 18:59:30 2013
@@ -174,6 +174,7 @@ void Connection::requestIOProcessing(boo
 Connection::~Connection()
 {
     if (mgmtObject != 0) {
+        mgmtObject->debugStats("destroying");
         if (!link)
             agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, getUserId(), mgmtObject->get_remoteProperties()));
         QPID_LOG_CAT(debug, model, "Delete connection. user:" << getUserId()

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/Connection.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/Connection.cpp Fri Sep 20 18:59:30 2013
@@ -127,7 +127,7 @@ void Connection::open(const ConnectionSe
         impl->registerFailureCallback ( failureCallback );
 }
 
-const ConnectionSettings& Connection::getNegotiatedSettings()
+const ConnectionSettings& Connection::getNegotiatedSettings() const
 {
     if (!isOpen())
         throw Exception(QPID_MSG("Connection is not open."));

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Fri Sep 20 18:59:30 2013
@@ -50,6 +50,7 @@ using qpid::sys::Mutex;
 namespace {
 const std::string OK("OK");
 const std::string PLAIN("PLAIN");
+const std::string ANONYMOUS("ANONYMOUS");
 const std::string en_US("en_US");
 
 const std::string INVALID_STATE_START("start received in invalid state");
@@ -244,6 +245,7 @@ void ConnectionHandler::start(const Fiel
 
     std::vector<std::string> mechlist;
     mechlist.reserve(mechanisms.size());
+
     if (mechanism.empty()) {
         //mechlist is simply what the server offers
         std::transform(mechanisms.begin(), mechanisms.end(), std::back_inserter(mechlist), Array::get<std::string, Array::ValuePtr>);
@@ -273,9 +275,25 @@ void ConnectionHandler::start(const Fiel
             proxy.send(body);
         }
     } else {
-        //TODO: verify that desired mechanism and locale are supported
-        std::string response = ((char)0) + username + ((char)0) + password;
-        proxy.startOk(properties, mechanism, response, locale);
+        bool haveAnonymous(false);
+        bool havePlain(false);
+        for (std::vector<std::string>::const_iterator i = mechlist.begin(); i != mechlist.end(); ++i) {
+            if (*i == ANONYMOUS) {
+                haveAnonymous = true;
+                break;
+            } else if (*i == PLAIN) {
+                havePlain = true;
+            }
+        }
+        if (haveAnonymous && (mechanism.empty() || mechanism.find(ANONYMOUS) != std::string::npos)) {
+            proxy.startOk(properties, ANONYMOUS, username, locale);
+        } else if (havePlain && (mechanism.empty() || mechanism.find(PLAIN) !=std::string::npos)) {
+            std::string response = ((char)0) + username + ((char)0) + password;
+            proxy.startOk(properties, PLAIN, response, locale);
+        } else {
+            if (!mechanism.empty()) throw Exception(QPID_MSG("Desired mechanism(s) not valid: " << mechanism << "; client supports PLAIN or ANONYMOUS, broker supports: " << join(mechlist)));
+            throw Exception(QPID_MSG("No valid mechanism; client supports PLAIN or ANONYMOUS, broker supports: " << join(mechlist)));
+        }
     }
 }
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Fri Sep 20 18:59:30 2013
@@ -89,6 +89,7 @@ const std::string NODE("node");
 const std::string LINK("link");
 const std::string MODE("mode");
 const std::string RELIABILITY("reliability");
+const std::string TIMEOUT("timeout");
 const std::string NAME("name");
 const std::string DURABLE("durable");
 const std::string X_DECLARE("x-declare");
@@ -240,8 +241,8 @@ class Subscription : public Exchange, pu
     void cancel(qpid::client::AsyncSession& session, const std::string& destination);
   private:
     const std::string queue;
-    const bool reliable;
     const bool durable;
+    const bool reliable;
     const std::string actualType;
     const bool exclusiveQueue;
     const bool exclusiveSubscription;
@@ -516,13 +517,25 @@ std::string Subscription::getSubscriptio
 Subscription::Subscription(const Address& address, const std::string& type)
     : Exchange(address),
       queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
-      reliable(AddressResolution::is_reliable(address)),
       durable(Opt(address)/LINK/DURABLE),
+      //if the link is durable, then assume it is also reliable unless explicitly stated otherwise
+      //if not assume it is unreliable unless explicitly stated otherwise
+      reliable(durable ? !AddressResolution::is_unreliable(address) : AddressResolution::is_reliable(address)),
       actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
       exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
       exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)),
       alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str())
 {
+    const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value;
+    if (timeout) {
+        if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32());
+    } else if (durable && !(Opt(address)/LINK/RELIABILITY).value) {
+        //if durable but not explicitly reliable, then set a non-zero
+        //default for the autodelete timeout (previously this would
+        //have defaulted to autodelete immediately anyway, so the risk
+        //of the change causing problems is mitigated)
+        queueOptions.setInt("qpid.auto_delete_delay", 15*60);
+    }
     (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
     (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
     std::string selector = Opt(address)/LINK/SELECTOR;
@@ -584,7 +597,7 @@ void Subscription::subscribe(qpid::clien
 
     //create subscription queue:
     session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
-                         arg::autoDelete=!reliable, arg::durable=durable,
+                         arg::autoDelete=!(durable || reliable), arg::durable=durable,
                          arg::alternateExchange=alternateExchange,
                          arg::arguments=queueOptions);
     //'default' binding:
@@ -997,6 +1010,7 @@ Verifier::Verifier()
     link[NAME] = true;
     link[DURABLE] = true;
     link[RELIABILITY] = true;
+    link[TIMEOUT] = true;
     link[X_SUBSCRIBE] = true;
     link[X_DECLARE] = true;
     link[X_BINDINGS] = true;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Fri Sep 20 18:59:30 2013
@@ -43,6 +43,7 @@ using qpid::framing::Uuid;
 namespace {
 
 const std::string TCP("tcp");
+const std::string COLON(":");
 double FOREVER(std::numeric_limits<double>::max());
 
 // Time values in seconds can be specified as integer or floating point values.
@@ -86,9 +87,9 @@ bool expired(const sys::AbsTime& start, 
 } // namespace
 
 ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
-    replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1),
+    replaceUrls(false), autoReconnect(false), timeout(FOREVER), limit(-1),
     minReconnectInterval(0.001), maxReconnectInterval(2),
-    retries(0), reconnectOnLimitExceeded(true)
+    retries(0), reconnectOnLimitExceeded(true), disableAutoDecode(false)
 {
     setOptions(options);
     urls.insert(urls.begin(), url);
@@ -106,7 +107,7 @@ void ConnectionImpl::setOption(const std
 {
     sys::Mutex::ScopedLock l(lock);
     if (name == "reconnect") {
-        reconnect = value;
+        autoReconnect = value;
     } else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
         timeout = timeValue(value);
     } else if (name == "reconnect-limit" || name == "reconnect_limit") {
@@ -157,8 +158,10 @@ void ConnectionImpl::setOption(const std
         settings.sslCertName = value.asString();
     } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") {
         reconnectOnLimitExceeded = value;
-    } else if (name == "client-properties") {
+    } else if (name == "client-properties" || name == "client_properties") {
         amqp_0_10::translate(value.asMap(), settings.clientProperties);
+    } else if (name == "disable-auto-decode" || name == "disable_auto_decode") {
+        disableAutoDecode = value;
     } else {
         throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
     }
@@ -254,7 +257,7 @@ void ConnectionImpl::open()
 
 void ConnectionImpl::reopen()
 {
-    if (!reconnect) {
+    if (!autoReconnect) {
         throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
     }
     open();
@@ -265,7 +268,7 @@ void ConnectionImpl::connect(const qpid:
 {
     QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
     for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
-        if (!reconnect) {
+        if (!autoReconnect) {
             throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
         }
         if (limit >= 0 && retries++ >= limit) {
@@ -341,9 +344,52 @@ bool ConnectionImpl::backoff()
     }
 }
 
+void ConnectionImpl::reconnect(const std::string& u)
+{
+    sys::Mutex::ScopedLock l(lock);
+    try {
+        QPID_LOG(info, "Trying to connect to " << u << "...");
+        Url url(u, settings.protocol.size() ? settings.protocol : TCP);
+        if (url.getUser().size()) settings.username = url.getUser();
+        if (url.getPass().size()) settings.password = url.getPass();
+        connection.open(url, settings);
+        QPID_LOG(info, "Connected to " << u);
+        mergeUrls(connection.getInitialBrokers(), l);
+        if (!resetSessions(l)) throw qpid::messaging::TransportFailure("Could not re-establish sessions");
+    } catch (const qpid::TransportFailure& e) {
+        QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());
+        throw qpid::messaging::TransportFailure(e.what());
+    } catch (const std::exception& e) {
+        QPID_LOG(info, "Error while connecting to " << u << ": " << e.what());
+        throw qpid::messaging::MessagingException(e.what());
+    }
+}
+
+void ConnectionImpl::reconnect()
+{
+    if (!tryConnect()) {
+        throw qpid::messaging::TransportFailure("Could not reconnect");
+    }
+}
+std::string ConnectionImpl::getUrl() const
+{
+    if (isOpen()) {
+        std::stringstream u;
+        u << connection.getNegotiatedSettings().protocol << COLON << connection.getNegotiatedSettings().host << COLON << connection.getNegotiatedSettings().port;
+        return u.str();
+    } else {
+        return std::string();
+    }
+}
+
 std::string ConnectionImpl::getAuthenticatedUsername()
 {
     return connection.getNegotiatedSettings().username;
 }
 
+bool ConnectionImpl::getAutoDecode() const
+{
+    return !disableAutoDecode;
+}
+
 }}} // namespace qpid::client::amqp0_10



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org