You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC
svn commit: r1525101 [5/21] - in /qpid/branches/linearstore/qpid: ./ bin/
cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/
cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/...
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionHandler.cpp Fri Sep 20 18:59:30 2013
@@ -39,7 +39,11 @@ SessionHandler::SessionHandler(amqp_0_10
proxy(out)
{}
-SessionHandler::~SessionHandler() {}
+SessionHandler::~SessionHandler()
+{
+ if (session.get())
+ connection.getBroker().getSessionManager().forget(session->getId());
+}
void SessionHandler::connectionException(
framing::connection::CloseCode code, const std::string& msg)
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Sep 20 18:59:30 2013
@@ -79,7 +79,27 @@ void SessionState::addManagementObject()
}
}
+void SessionState::startTx() {
+ if (mgmtObject) { mgmtObject->inc_TxnStarts(); }
+}
+
+void SessionState::commitTx() {
+ if (mgmtObject) {
+ mgmtObject->inc_TxnCommits();
+ mgmtObject->inc_TxnCount();
+ }
+}
+
+void SessionState::rollbackTx() {
+ if (mgmtObject) {
+ mgmtObject->inc_TxnRejects();
+ mgmtObject->inc_TxnCount();
+ }
+}
+
SessionState::~SessionState() {
+ if (mgmtObject != 0)
+ mgmtObject->debugStats("destroying");
asyncCommandCompleter->cancel();
semanticState.closed();
if (mgmtObject != 0)
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/SessionState.h Fri Sep 20 18:59:30 2013
@@ -127,6 +127,11 @@ class SessionState : public qpid::Sessio
// belonging to inter-broker bridges
void addManagementObject();
+ // transaction-related methods just to update statistics
+ void startTx();
+ void commitTx();
+ void rollbackTx();
+
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.cpp Fri Sep 20 18:59:30 2013
@@ -80,3 +80,9 @@ System::System (string _dataDir, Broker*
}
}
+System::~System ()
+{
+ if (mgmtObject != 0)
+ mgmtObject->debugStats("destroying");
+}
+
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/System.h Fri Sep 20 18:59:30 2013
@@ -45,6 +45,8 @@ class System : public management::Manage
System (std::string _dataDir, Broker* broker = 0);
+ ~System ();
+
management::ManagementObject::shared_ptr GetManagementObject(void) const
{ return mgmtObject; }
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.cpp Fri Sep 20 18:59:30 2013
@@ -333,7 +333,10 @@ bool TopicExchange::isBound(Queue::share
return false;
}
-TopicExchange::~TopicExchange() {}
+TopicExchange::~TopicExchange() {
+ if (mgmtExchange != 0)
+ mgmtExchange->debugStats("destroying");
+}
const std::string TopicExchange::typeName("topic");
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TopicExchange.h Fri Sep 20 18:59:30 2013
@@ -81,7 +81,7 @@ class TopicExchange : public virtual Exc
};
public:
- static const std::string typeName;
+ QPID_BROKER_EXTERN static const std::string typeName;
static QPID_BROKER_EXTERN std::string normalize(const std::string& pattern);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.cpp Fri Sep 20 18:59:30 2013
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,8 @@
*
*/
#include "qpid/broker/TxAccept.h"
+#include "qpid/broker/TransactionObserver.h"
+#include "qpid/broker/Queue.h"
#include "qpid/log/Statement.h"
using std::bind1st;
@@ -28,54 +30,23 @@ using namespace qpid::broker;
using qpid::framing::SequenceSet;
using qpid::framing::SequenceNumber;
-TxAccept::RangeOp::RangeOp(const AckRange& r) : range(r) {}
-void TxAccept::RangeOp::prepare(TransactionContext* ctxt)
+TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) :
+ acked(_acked), unacked(_unacked)
{
- for_each(range.start, range.end, bind(&DeliveryRecord::dequeue, _1, ctxt));
+ for(SequenceSet::RangeIterator i = acked.rangesBegin(); i != acked.rangesEnd(); ++i)
+ ranges.push_back(DeliveryRecord::findRange(unacked, i->first(), i->last()));
}
-void TxAccept::RangeOp::commit()
-{
- for_each(range.start, range.end, bind(&DeliveryRecord::committed, _1));
- for_each(range.start, range.end, bind(&DeliveryRecord::setEnded, _1));
-}
-
-TxAccept::RangeOps::RangeOps(DeliveryRecords& u) : unacked(u) {}
-
-void TxAccept::RangeOps::operator()(SequenceNumber start, SequenceNumber end)
-{
- ranges.push_back(RangeOp(DeliveryRecord::findRange(unacked, start, end)));
-}
-
-void TxAccept::RangeOps::prepare(TransactionContext* ctxt)
-{
- std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt));
-}
-
-void TxAccept::RangeOps::commit()
-{
- std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1));
- //now remove if isRedundant():
- if (!ranges.empty()) {
- DeliveryRecords::iterator begin = ranges.front().range.start;
- DeliveryRecords::iterator end = ranges.back().range.end;
- DeliveryRecords::iterator removed = remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant));
- unacked.erase(removed, end);
- }
-}
-
-TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) :
- acked(_acked), unacked(_unacked), ops(unacked)
-{
- //populate the ops
- acked.for_each(ops);
+void TxAccept::each(boost::function<void(DeliveryRecord&)> f) {
+ for(AckRanges::iterator i = ranges.begin(); i != ranges.end(); ++i)
+ for_each(i->start, i->end, f);
}
bool TxAccept::prepare(TransactionContext* ctxt) throw()
{
try{
- ops.prepare(ctxt);
+ each(bind(&DeliveryRecord::dequeue, _1, ctxt));
return true;
}catch(const std::exception& e){
QPID_LOG(error, "Failed to prepare: " << e.what());
@@ -86,10 +57,19 @@ bool TxAccept::prepare(TransactionContex
}
}
-void TxAccept::commit() throw()
+void TxAccept::commit() throw()
{
try {
- ops.commit();
+ each(bind(&DeliveryRecord::committed, _1));
+ each(bind(&DeliveryRecord::setEnded, _1));
+ //now remove if isRedundant():
+ if (!ranges.empty()) {
+ DeliveryRecords::iterator begin = ranges.front().start;
+ DeliveryRecords::iterator end = ranges.back().end;
+ DeliveryRecords::iterator removed =
+ remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant));
+ unacked.erase(removed, end);
+ }
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to commit: " << e.what());
} catch(...) {
@@ -98,3 +78,13 @@ void TxAccept::commit() throw()
}
void TxAccept::rollback() throw() {}
+
+namespace {
+void callObserverDR(boost::shared_ptr<TransactionObserver> observer, DeliveryRecord& dr) {
+ observer->dequeue(dr.getQueue(), dr.getMessageId(), dr.getReplicationId());
+}
+} // namespace
+
+void TxAccept::callObserver(const ObserverPtr& observer) {
+ each(boost::bind(&callObserverDR, observer, _1));
+}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxAccept.h Fri Sep 20 18:59:30 2013
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,58 +21,45 @@
#ifndef _TxAccept_
#define _TxAccept_
-#include <algorithm>
-#include <functional>
-#include <list>
#include "qpid/framing/SequenceSet.h"
+#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/TxOp.h"
+#include <boost/function.hpp>
+#include <algorithm>
+#include <functional>
+#include <list>
namespace qpid {
- namespace broker {
- /**
- * Defines the transactional behaviour for accepts received by
- * a transactional channel.
- */
- class TxAccept : public TxOp {
- struct RangeOp
- {
- AckRange range;
-
- RangeOp(const AckRange& r);
- void prepare(TransactionContext* ctxt);
- void commit();
- };
-
- struct RangeOps
- {
- std::vector<RangeOp> ranges;
- DeliveryRecords& unacked;
-
- RangeOps(DeliveryRecords& u);
-
- void operator()(framing::SequenceNumber start, framing::SequenceNumber end);
- void prepare(TransactionContext* ctxt);
- void commit();
- };
-
- framing::SequenceSet acked;
- DeliveryRecords& unacked;
- RangeOps ops;
-
- public:
- /**
- * @param acked a representation of the accumulation of
- * acks received
- * @param unacked the record of delivered messages
- */
- TxAccept(const framing::SequenceSet& acked, DeliveryRecords& unacked);
- virtual bool prepare(TransactionContext* ctxt) throw();
- virtual void commit() throw();
- virtual void rollback() throw();
- virtual ~TxAccept(){}
- };
- }
+namespace broker {
+/**
+ * Defines the transactional behaviour for accepts received by
+ * a transactional channel.
+ */
+class TxAccept : public TxOp {
+ typedef std::vector<AckRange> AckRanges;
+ typedef boost::shared_ptr<TransactionObserver> ObserverPtr;
+
+ void each(boost::function<void(DeliveryRecord&)>);
+
+ framing::SequenceSet acked;
+ DeliveryRecords& unacked;
+ AckRanges ranges;
+
+ public:
+ /**
+ * @param acked a representation of the accumulation of
+ * acks received
+ * @param unacked the record of delivered messages
+ */
+ QPID_BROKER_EXTERN TxAccept(const framing::SequenceSet& acked, DeliveryRecords& unacked);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual void callObserver(const ObserverPtr&);
+ virtual ~TxAccept(){}
+};
+}
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.cpp Fri Sep 20 18:59:30 2013
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/TxBuffer.h"
+#include "qpid/broker/TransactionObserver.h"
#include "qpid/log/Statement.h"
#include <boost/mem_fn.hpp>
@@ -26,8 +27,11 @@
using boost::mem_fn;
using namespace qpid::broker;
+TxBuffer::TxBuffer() : observer(new NullTransactionObserver) {}
+
bool TxBuffer::prepare(TransactionContext* const ctxt)
{
+ if (!observer->prepare()) return false;
for(op_iterator i = ops.begin(); i != ops.end(); i++){
if(!(*i)->prepare(ctxt)){
return false;
@@ -38,18 +42,21 @@ bool TxBuffer::prepare(TransactionContex
void TxBuffer::commit()
{
+ observer->commit();
std::for_each(ops.begin(), ops.end(), mem_fn(&TxOp::commit));
ops.clear();
}
void TxBuffer::rollback()
{
+ observer->rollback();
std::for_each(ops.begin(), ops.end(), mem_fn(&TxOp::rollback));
ops.clear();
}
void TxBuffer::enlist(TxOp::shared_ptr op)
{
+ op->callObserver(observer);
ops.push_back(op);
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxBuffer.h Fri Sep 20 18:59:30 2013
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,21 +34,21 @@
* transaction. This work can be committed or rolled back. Committing
* is a two-stage process: first all the operations should be
* prepared, then if that succeeds they can be committed.
- *
+ *
* In the 2pc case, a successful prepare may be followed by either a
* commit or a rollback.
- *
+ *
* Atomicity of prepare is ensured by using a lower level
* transactional facility. This saves explicitly rolling back all the
* successfully prepared ops when one of them fails. i.e. we do not
* use 2pc internally, we instead ensure that prepare is atomic at a
* lower level. This makes individual prepare operations easier to
* code.
- *
+ *
* Transactions on a messaging broker effect three types of 'action':
* (1) updates to persistent storage (2) updates to transient storage
* or cached data (3) network writes.
- *
+ *
* Of these, (1) should always occur atomically during prepare to
* ensure that if the broker crashes while a transaction is being
* completed the persistent state (which is all that then remains) is
@@ -58,59 +58,74 @@
* TransactionalStore in use.
*/
namespace qpid {
- namespace broker {
- class TxBuffer{
- typedef std::vector<TxOp::shared_ptr>::iterator op_iterator;
- std::vector<TxOp::shared_ptr> ops;
- protected:
-
- public:
- typedef boost::shared_ptr<TxBuffer> shared_ptr;
- /**
- * Adds an operation to the transaction.
- */
- QPID_BROKER_EXTERN void enlist(TxOp::shared_ptr op);
-
- /**
- * Requests that all ops are prepared. This should
- * primarily involve making sure that a persistent record
- * of the operations is stored where necessary.
- *
- * Once prepared, a transaction can be committed (or in
- * the 2pc case, rolled back).
- *
- * @returns true if all the operations prepared
- * successfully, false if not.
- */
- QPID_BROKER_EXTERN bool prepare(TransactionContext* const ctxt);
-
- /**
- * Signals that the ops all prepared successfully and can
- * now commit, i.e. the operation can now be fully carried
- * out.
- *
- * Should only be called after a call to prepare() returns
- * true.
- */
- QPID_BROKER_EXTERN void commit();
-
- /**
- * Signals that all ops can be rolled back.
- *
- * Should only be called either after a call to prepare()
- * returns true (2pc) or instead of a prepare call
- * ('server-local')
- */
- QPID_BROKER_EXTERN void rollback();
-
- /**
- * Helper method for managing the process of server local
- * commit
- */
- QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store);
- };
+
+namespace broker {
+class TransactionObserver;
+
+class TxBuffer {
+ private:
+ typedef std::vector<TxOp::shared_ptr>::iterator op_iterator;
+ std::vector<TxOp::shared_ptr> ops;
+ boost::shared_ptr<TransactionObserver> observer;
+
+ public:
+ typedef boost::shared_ptr<TxBuffer> shared_ptr;
+
+ QPID_BROKER_EXTERN TxBuffer();
+
+ /**
+ * Adds an operation to the transaction.
+ */
+ QPID_BROKER_EXTERN void enlist(TxOp::shared_ptr op);
+
+ /**
+ * Requests that all ops are prepared. This should
+ * primarily involve making sure that a persistent record
+ * of the operations is stored where necessary.
+ *
+ * Once prepared, a transaction can be committed (or in
+ * the 2pc case, rolled back).
+ *
+ * @returns true if all the operations prepared
+ * successfully, false if not.
+ */
+ QPID_BROKER_EXTERN bool prepare(TransactionContext* const ctxt);
+
+ /**
+ * Signals that the ops all prepared successfully and can
+ * now commit, i.e. the operation can now be fully carried
+ * out.
+ *
+ * Should only be called after a call to prepare() returns
+ * true.
+ */
+ QPID_BROKER_EXTERN void commit();
+
+ /**
+ * Signals that all ops can be rolled back.
+ *
+ * Should only be called either after a call to prepare()
+ * returns true (2pc) or instead of a prepare call
+ * ('server-local')
+ */
+ QPID_BROKER_EXTERN void rollback();
+
+ /**
+ * Helper method for managing the process of server local
+ * commit
+ */
+ QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store);
+
+
+ QPID_BROKER_EXTERN void setObserver(boost::shared_ptr<TransactionObserver> o) {
+ observer = o;
+ }
+
+ QPID_BROKER_EXTERN boost::shared_ptr<TransactionObserver> getObserver() const {
+ return observer;
}
-}
+};
+}} // namespace qpid::broker
#endif
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxOp.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxOp.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxOp.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/TxOp.h Fri Sep 20 18:59:30 2013
@@ -25,17 +25,19 @@
#include <boost/shared_ptr.hpp>
namespace qpid {
- namespace broker {
+namespace broker {
+class TransactionObserver;
- class TxOp{
- public:
- typedef boost::shared_ptr<TxOp> shared_ptr;
-
- virtual bool prepare(TransactionContext*) throw() = 0;
- virtual void commit() throw() = 0;
- virtual void rollback() throw() = 0;
- virtual ~TxOp(){}
- };
+class TxOp{
+ public:
+ typedef boost::shared_ptr<TxOp> shared_ptr;
+
+ virtual bool prepare(TransactionContext*) throw() = 0;
+ virtual void commit() throw() = 0;
+ virtual void rollback() throw() = 0;
+ virtual void callObserver(const boost::shared_ptr<TransactionObserver>&) = 0;
+ virtual ~TxOp(){}
+};
}} // namespace qpid::broker
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.cpp Fri Sep 20 18:59:30 2013
@@ -43,6 +43,11 @@ Vhost::Vhost (qpid::management::Manageab
}
}
+Vhost::~Vhost () {
+ if (mgmtObject != 0)
+ mgmtObject->debugStats("destroying");
+}
+
void Vhost::setFederationTag(const std::string& tag)
{
mgmtObject->set_federationTag(tag);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Vhost.h Fri Sep 20 18:59:30 2013
@@ -40,6 +40,8 @@ class Vhost : public management::Managea
Vhost (management::Manageable* parentBroker, Broker* broker = 0);
+ ~Vhost ();
+
management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
void setFederationTag(const std::string& tag);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Fri Sep 20 18:59:30 2013
@@ -182,12 +182,9 @@ void Connection::open()
void Connection::readPeerProperties()
{
- /**
- * TODO: enable when proton 0.5 has been released:
qpid::types::Variant::Map properties;
DataReader::read(pn_connection_remote_properties(connection), properties);
setPeerProperties(properties);
- */
}
void Connection::closed()
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp Fri Sep 20 18:59:30 2013
@@ -160,7 +160,7 @@ void DataReader::readArray(pn_data_t* /*
void DataReader::readList(pn_data_t* data, const qpid::amqp::Descriptor* descriptor)
{
size_t count = pn_data_get_list(data);
- bool skip = reader.onStartList(count, qpid::amqp::CharSequence(), descriptor);
+ bool skip = reader.onStartList(count, qpid::amqp::CharSequence(), qpid::amqp::CharSequence(), descriptor);
if (!skip) {
pn_data_enter(data);
for (size_t i = 0; i < count && pn_data_next(data); ++i) {
@@ -174,7 +174,7 @@ void DataReader::readList(pn_data_t* dat
void DataReader::readMap(pn_data_t* data, const qpid::amqp::Descriptor* descriptor)
{
size_t count = pn_data_get_map(data);
- reader.onStartMap(count, qpid::amqp::CharSequence(), descriptor);
+ reader.onStartMap(count, qpid::amqp::CharSequence(), qpid::amqp::CharSequence(), descriptor);
pn_data_enter(data);
for (size_t i = 0; i < count && pn_data_next(data); ++i) {
read(data);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp Fri Sep 20 18:59:30 2013
@@ -25,6 +25,7 @@
#include "qpid/amqp/descriptors.h"
#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/Message.h"
+#include "qpid/broker/Broker.h"
namespace qpid {
namespace broker {
@@ -104,7 +105,7 @@ namespace {
}
DecodingIncoming::DecodingIncoming(pn_link_t* link, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name)
- : Incoming(link, broker, parent, source, target, name), session(parent.shared_from_this()) {}
+ : Incoming(link, broker, parent, source, target, name), session(parent.shared_from_this()), expiryPolicy(broker.getExpiryPolicy()) {}
DecodingIncoming::~DecodingIncoming() {}
void DecodingIncoming::readable(pn_delivery_t* delivery)
@@ -116,6 +117,7 @@ void DecodingIncoming::readable(pn_deliv
qpid::broker::Message message(received, received);
userid.verify(message.getUserId());
+ message.computeExpiration(expiryPolicy);
handle(message);
--window;
received->begin();
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Incoming.h Fri Sep 20 18:59:30 2013
@@ -77,6 +77,7 @@ class DecodingIncoming : public Incoming
virtual void handle(qpid::broker::Message&) = 0;
private:
boost::shared_ptr<Session> session;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
};
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.cpp Fri Sep 20 18:59:30 2013
@@ -21,9 +21,13 @@
#include "Message.h"
#include "qpid/amqp/Decoder.h"
#include "qpid/amqp/descriptors.h"
-#include "qpid/amqp/Reader.h"
-#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/amqp/ListBuilder.h"
+#include "qpid/amqp/MapBuilder.h"
#include "qpid/amqp/MapHandler.h"
+#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/amqp/Reader.h"
+#include "qpid/amqp/typecodes.h"
+#include "qpid/types/encodings.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/Buffer.h"
#include <string.h>
@@ -121,8 +125,8 @@ namespace {
}
}
- bool onStartList(uint32_t, const CharSequence&, const Descriptor*) { return false; }
- bool onStartMap(uint32_t, const CharSequence&, const Descriptor*) { return false; }
+ bool onStartList(uint32_t, const CharSequence&, const CharSequence&, const Descriptor*) { return false; }
+ bool onStartMap(uint32_t, const CharSequence&, const CharSequence&, const Descriptor*) { return false; }
bool onStartArray(uint32_t, const CharSequence&, const Constructor&, const Descriptor*) { return false; }
public:
@@ -144,8 +148,12 @@ void Message::processProperties(MapHandl
//and whether it should indeed only be the content that is thus
//measured
uint64_t Message::getContentSize() const { return data.size(); }
-//getContent() is used primarily for decoding qmf messages in management and ha
-std::string Message::getContent() const { return empty; }
+//getContent() is used primarily for decoding qmf messages in
+//management and ha, but also by the xml exchange
+std::string Message::getContent() const
+{
+ return std::string(body.data, body.size);
+}
Message::Message(size_t size) : data(size)
{
@@ -253,12 +261,54 @@ void Message::onGroupId(const qpid::amqp
void Message::onGroupSequence(uint32_t) {}
void Message::onReplyToGroupId(const qpid::amqp::CharSequence&) {}
-void Message::onApplicationProperties(const qpid::amqp::CharSequence& v) { applicationProperties = v; }
-void Message::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { deliveryAnnotations = v; }
-void Message::onMessageAnnotations(const qpid::amqp::CharSequence& v) { messageAnnotations = v; }
-void Message::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&) { body = v; }
-void Message::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {}
-void Message::onFooter(const qpid::amqp::CharSequence& v) { footer = v; }
+void Message::onApplicationProperties(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { applicationProperties = v; }
+void Message::onDeliveryAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence& v) { deliveryAnnotations = v; }
+void Message::onMessageAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence& v) { messageAnnotations = v; }
+
+void Message::onData(const qpid::amqp::CharSequence& v) { body = v; }
+void Message::onAmqpSequence(const qpid::amqp::CharSequence& v) { body = v; bodyType = qpid::amqp::typecodes::LIST_NAME; }
+void Message::onAmqpValue(const qpid::amqp::CharSequence& v, const std::string& t)
+{
+ body = v;
+ if (t == qpid::amqp::typecodes::STRING_NAME) {
+ bodyType = qpid::types::encodings::UTF8;
+ } else if (t == qpid::amqp::typecodes::SYMBOL_NAME) {
+ bodyType = qpid::types::encodings::ASCII;
+ } else if (t == qpid::amqp::typecodes::BINARY_NAME) {
+ bodyType = qpid::types::encodings::BINARY;
+ } else {
+ bodyType = t;
+ }
+}
+void Message::onAmqpValue(const qpid::types::Variant& v) { typedBody = v; }
+
+void Message::onFooter(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence& v) { footer = v; }
+
+bool Message::isTypedBody() const
+{
+ return !typedBody.isVoid() || !bodyType.empty();
+}
+
+qpid::types::Variant Message::getTypedBody() const
+{
+ if (bodyType == qpid::amqp::typecodes::LIST_NAME) {
+ qpid::amqp::ListBuilder builder;
+ qpid::amqp::Decoder decoder(body.data, body.size);
+ decoder.read(builder);
+ return builder.getList();
+ } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) {
+ qpid::amqp::MapBuilder builder;
+ qpid::amqp::Decoder decoder(body.data, body.size);
+ decoder.read(builder);
+ return builder.getMap();
+ } else if (!bodyType.empty()) {
+ qpid::types::Variant value(std::string(body.data, body.size));
+ value.setEncoding(bodyType);
+ return value;
+ } else {
+ return typedBody;
+ }
+}
//PersistableMessage interface:
@@ -292,31 +342,41 @@ void Message::decodeHeader(framing::Buff
}
void Message::decodeContent(framing::Buffer& /*buffer*/) {}
-boost::intrusive_ptr<PersistableMessage> Message::merge(const std::map<std::string, qpid::types::Variant>& annotations) const
+boost::intrusive_ptr<PersistableMessage> Message::merge(const std::map<std::string, qpid::types::Variant>& added) const
{
//message- or delivery- annotations? would have to determine that from the name, for now assume always message-annotations
- size_t extra = 0;
+ std::map<std::string, qpid::types::Variant> combined;
+ const std::map<std::string, qpid::types::Variant>* annotations(0);
if (messageAnnotations) {
- //TODO: actual merge required
+ //combine existing and added annotations (TODO: this could be
+ //optimised by avoiding the decode and simply 'editing' the
+ //size and count in the raw data, then appending the new
+ //elements).
+ qpid::amqp::MapBuilder builder;
+ qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size);
+ decoder.read(builder);
+ combined = builder.getMap();
+ for (std::map<std::string, qpid::types::Variant>::const_iterator i = added.begin(); i != added.end(); ++i) {
+ combined[i->first] = i->second;
+ }
+ annotations = &combined;
} else {
- //add whole new section
- extra = qpid::amqp::MessageEncoder::getEncodedSize(annotations, true);
+ //additions form a whole new section
+ annotations = &added;
}
- boost::intrusive_ptr<Message> copy(new Message(data.size()+extra));
+ size_t annotationsSize = qpid::amqp::MessageEncoder::getEncodedSize(*annotations, true) + 3/*descriptor*/;
+
+ boost::intrusive_ptr<Message> copy(new Message(bareMessage.size+footer.size+deliveryAnnotations.size+annotationsSize));
size_t position(0);
- if (deliveryAnnotations) {
+ if (deliveryAnnotations.size) {
::memcpy(©->data[position], deliveryAnnotations.data, deliveryAnnotations.size);
position += deliveryAnnotations.size;
}
- if (messageAnnotations) {
- //TODO: actual merge required
- ::memcpy(©->data[position], messageAnnotations.data, messageAnnotations.size);
- position += messageAnnotations.size;
- } else {
- qpid::amqp::MessageEncoder encoder(©->data[position], extra);
- encoder.writeMap(annotations, &qpid::amqp::message::MESSAGE_ANNOTATIONS, true);
- position += extra;
- }
+
+ qpid::amqp::Encoder encoder(©->data[position], annotationsSize);
+ encoder.writeMap(*annotations, &qpid::amqp::message::MESSAGE_ANNOTATIONS, true);
+ position += encoder.getPosition();
+
if (bareMessage) {
::memcpy(©->data[position], bareMessage.data, bareMessage.size);
position += bareMessage.size;
@@ -325,7 +385,18 @@ boost::intrusive_ptr<PersistableMessage>
::memcpy(©->data[position], footer.data, footer.size);
position += footer.size;
}
+ copy->data.resize(position);//annotationsSize may be slightly bigger than needed if optimisations are used (e.g. smallint)
copy->scan();
+ {
+ qpid::amqp::MapBuilder builder;
+ qpid::amqp::Decoder decoder(copy->messageAnnotations.data, copy->messageAnnotations.size);
+ decoder.read(builder);
+ QPID_LOG(notice, "Merged annotations are now: " << builder.getMap() << " raw=" << std::hex << std::string(copy->messageAnnotations.data, copy->messageAnnotations.size) << " " << copy->messageAnnotations.size << " bytes");
+ }
+ assert(copy->messageAnnotations);
+ assert(copy->bareMessage.size == bareMessage.size);
+ assert(copy->footer.size == footer.size);
+ assert(copy->deliveryAnnotations.size == deliveryAnnotations.size);
return copy;
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.h Fri Sep 20 18:59:30 2013
@@ -64,6 +64,8 @@ class Message : public qpid::broker::Mes
qpid::amqp::CharSequence getBareMessage() const;
qpid::amqp::CharSequence getBody() const;
qpid::amqp::CharSequence getFooter() const;
+ bool isTypedBody() const;
+ qpid::types::Variant getTypedBody() const;
Message(size_t size);
char* getData();
@@ -109,6 +111,8 @@ class Message : public qpid::broker::Mes
//body:
qpid::amqp::CharSequence body;
+ qpid::types::Variant typedBody;
+ std::string bodyType;
//footer:
qpid::amqp::CharSequence footer;
@@ -136,12 +140,16 @@ class Message : public qpid::broker::Mes
void onGroupSequence(uint32_t);
void onReplyToGroupId(const qpid::amqp::CharSequence&);
- void onApplicationProperties(const qpid::amqp::CharSequence&);
- void onDeliveryAnnotations(const qpid::amqp::CharSequence&);
- void onMessageAnnotations(const qpid::amqp::CharSequence&);
- void onBody(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor&);
- void onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&);
- void onFooter(const qpid::amqp::CharSequence&);
+ void onApplicationProperties(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
+ void onDeliveryAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
+ void onMessageAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
+
+ void onData(const qpid::amqp::CharSequence&);
+ void onAmqpSequence(const qpid::amqp::CharSequence&);
+ void onAmqpValue(const qpid::amqp::CharSequence&, const std::string& type);
+ void onAmqpValue(const qpid::types::Variant&);
+
+ void onFooter(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
};
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Fri Sep 20 18:59:30 2013
@@ -29,6 +29,7 @@
#include "qpid/sys/OutputControl.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/framing/Buffer.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
@@ -44,14 +45,16 @@ void Outgoing::wakeup()
session.wakeup();
}
-OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool e, bool p)
+OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session,
+ qpid::sys::OutputControl& o, SubscriptionType type, bool e, bool p)
: Outgoing(broker, session, source, target, pn_link_name(l)),
- Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
+ Consumer(pn_link_name(l), type),
exclusive(e),
isControllingUser(p),
queue(q), deliveries(5000), link(l), out(o),
current(0), outstanding(0),
- buffer(1024)/*used only for header at present*/
+ buffer(1024)/*used only for header at present*/,
+ unreliable(pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED)
{
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
deliveries[i].init(i);
@@ -91,8 +94,7 @@ void OutgoingFromQueue::write(const char
void OutgoingFromQueue::handle(pn_delivery_t* delivery)
{
- pn_delivery_tag_t tag = pn_delivery_tag(delivery);
- size_t i = *reinterpret_cast<const size_t*>(tag.bytes);
+ size_t i = Record::getIndex(pn_delivery_tag(delivery));
Record& r = deliveries[i];
if (pn_delivery_writable(delivery)) {
assert(r.msg);
@@ -104,6 +106,7 @@ void OutgoingFromQueue::handle(pn_delive
write(&buffer[0], encoder.getPosition());
Translation t(r.msg);
t.write(*this);
+ if (unreliable) pn_delivery_settle(delivery);
if (pn_link_advance(link)) {
--outstanding;
outgoingMessageSent();
@@ -112,26 +115,28 @@ void OutgoingFromQueue::handle(pn_delive
QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
}
}
- if (pn_delivery_updated(delivery)) {
+ if (unreliable) {
+ if (preAcquires()) queue->dequeue(0, r.cursor);
+ r.reset();
+ } else if (pn_delivery_updated(delivery)) {
assert(r.delivery == delivery);
r.disposition = pn_delivery_remote_state(delivery);
if (r.disposition) {
switch (r.disposition) {
case PN_ACCEPTED:
- //TODO: only if consuming
- queue->dequeue(0, r.cursor);
+ if (preAcquires()) queue->dequeue(0, r.cursor);
outgoingMessageAccepted();
break;
case PN_REJECTED:
- queue->reject(r.cursor);
+ if (preAcquires()) queue->reject(r.cursor);
outgoingMessageRejected();
break;
case PN_RELEASED:
- queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented
+ if (preAcquires()) queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented
outgoingMessageRejected();//TODO: not quite true...
break;
case PN_MODIFIED:
- queue->release(r.cursor, true);//TODO: proper handling of modified
+ if (preAcquires()) queue->release(r.cursor, true);//TODO: proper handling of modified
outgoingMessageRejected();//TODO: not quite true...
break;
default:
@@ -251,12 +256,17 @@ qpid::broker::OwnershipToken* OutgoingFr
return 0;
}
-OutgoingFromQueue::Record::Record() : delivery(0), disposition(0), index(0) {}
+OutgoingFromQueue::Record::Record() : delivery(0), disposition(0), index(0)
+{
+ tag.bytes = tagData;
+ tag.size = TAG_WIDTH;
+}
void OutgoingFromQueue::Record::init(size_t i)
{
index = i;
- tag.bytes = reinterpret_cast<const char*>(&index);
- tag.size = sizeof(index);
+ qpid::framing::Buffer buffer(tagData, tag.size);
+ assert(index <= std::numeric_limits<uint32_t>::max());
+ buffer.putLong(index);
}
void OutgoingFromQueue::Record::reset()
{
@@ -266,5 +276,13 @@ void OutgoingFromQueue::Record::reset()
disposition = 0;
}
+size_t OutgoingFromQueue::Record::getIndex(pn_delivery_tag_t t)
+{
+ assert(t.size == TAG_WIDTH);
+ qpid::framing::Buffer buffer(const_cast<char*>(t.bytes)/*won't ever be written to*/, t.size);
+ return (size_t) buffer.getLong();
+}
+
+
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Fri Sep 20 18:59:30 2013
@@ -88,7 +88,8 @@ class Outgoing : public ManagedOutgoingL
class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue>
{
public:
- OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool exclusive, bool isControllingUser);
+ OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&,
+ qpid::sys::OutputControl& o, SubscriptionType type, bool exclusive, bool isControllingUser);
void setSubjectFilter(const std::string&);
void setSelectorFilter(const std::string&);
void init();
@@ -117,10 +118,17 @@ class OutgoingFromQueue : public Outgoin
int disposition;
size_t index;
pn_delivery_tag_t tag;
+ //The delivery tag is a 4 byte value representing the
+ //index. It is encoded separately to avoid alignment issues.
+ //The number of deliveries held here is always strictly
+ //bounded, so 4 bytes is more than enough.
+ static const size_t TAG_WIDTH = sizeof(uint32_t);
+ char tagData[TAG_WIDTH];
Record();
void init(size_t i);
void reset();
+ static size_t getIndex(pn_delivery_tag_t);
};
const bool exclusive;
@@ -134,6 +142,7 @@ class OutgoingFromQueue : public Outgoin
std::vector<char> buffer;
std::string subjectFilter;
boost::scoped_ptr<Selector> selector;
+ bool unreliable;
};
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.cpp Fri Sep 20 18:59:30 2013
@@ -105,14 +105,14 @@ void Relay::detached(Outgoing*)
{
out = 0;
isDetached = true;
- std::cerr << "Outgoing link detached from relay" << std::endl;
+ QPID_LOG(info, "Outgoing link detached from relay [" << this << "]");
if (in) in->wakeup();
}
void Relay::detached(Incoming*)
{
in = 0;
isDetached = true;
- std::cerr << "Incoming link detached from relay" << std::endl;
+ QPID_LOG(info, "Incoming link detached from relay [" << this << "]");
if (out) out->wakeup();
}
@@ -139,13 +139,13 @@ void OutgoingFromRelay::handle(pn_delive
if (pn_delivery_writable(delivery)) {
if (transfer->write(link)) {
outgoingMessageSent();
- QPID_LOG(debug, "Sent relayed message " << name);
+ QPID_LOG(debug, "Sent relayed message " << name << " [" << relay.get() << "]");
} else {
- QPID_LOG(error, "Failed to send relayed message " << name);
+ QPID_LOG(error, "Failed to send relayed message " << name << " [" << relay.get() << "]");
}
}
if (pn_delivery_updated(delivery)) {
- pn_disposition_t d = transfer->updated();
+ uint64_t d = transfer->updated();
switch (d) {
case PN_ACCEPTED:
outgoingMessageAccepted();
@@ -226,6 +226,7 @@ void IncomingToRelay::detached()
relay->detached(this);
}
+BufferedTransfer::BufferedTransfer() : disposition(0) {}
void BufferedTransfer::initIn(pn_link_t* link, pn_delivery_t* d)
{
in.handle = d;
@@ -264,7 +265,7 @@ void BufferedTransfer::initOut(pn_link_t
pn_delivery_set_context(out.handle, this);
}
-pn_disposition_t BufferedTransfer::updated()
+uint64_t BufferedTransfer::updated()
{
disposition = pn_delivery_remote_state(out.handle);
if (disposition) {
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Relay.h Fri Sep 20 18:59:30 2013
@@ -45,10 +45,11 @@ struct Delivery
class BufferedTransfer
{
public:
+ BufferedTransfer();
void initIn(pn_link_t* link, pn_delivery_t* d);
bool settle();
void initOut(pn_link_t* link);
- pn_disposition_t updated();
+ uint64_t updated();
bool write(pn_link_t*);
private:
std::vector<char> data;
@@ -56,7 +57,7 @@ class BufferedTransfer
Delivery out;
pn_delivery_tag_t dt;
std::vector<char> tag;
- pn_disposition_t disposition;
+ uint64_t disposition;
};
/**
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp Fri Sep 20 18:59:30 2013
@@ -36,7 +36,7 @@ namespace amqp {
SaslClient::SaslClient(qpid::sys::OutputControl& out_, const std::string& id, boost::shared_ptr<Interconnect> c, std::auto_ptr<qpid::Sasl> s,
const std::string& hostname_, const std::string& mechs, const qpid::sys::SecuritySettings& t)
: qpid::amqp::SaslClient(id), out(out_), connection(c), sasl(s),
- hostname(hostname_), allowedMechanisms(mechs), transport(t), readHeader(true), writeHeader(false), haveOutput(false), state(NONE) {}
+ hostname(hostname_), allowedMechanisms(mechs), transport(t), readHeader(true), writeHeader(false), haveOutput(false), initialised(false), state(NONE) {}
SaslClient::~SaslClient()
{
@@ -67,8 +67,10 @@ std::size_t SaslClient::encode(char* buf
encoded += writeProtocolHeader(buffer, size);
writeHeader = !encoded;
}
- if (state == NONE && encoded < size) {
- encoded += write(buffer + encoded, size - encoded);
+ if ((!initialised || state == NONE) && encoded < size) {
+ size_t extra = write(buffer + encoded, size - encoded);
+ encoded += extra;
+ initialised = extra > 0;
} else if (state == SUCCEEDED) {
if (securityLayer.get()) encoded += securityLayer->encode(buffer + encoded, size - encoded);
else encoded += connection->encode(buffer + encoded, size - encoded);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/SaslClient.h Fri Sep 20 18:59:30 2013
@@ -64,6 +64,7 @@ class SaslClient : public qpid::sys::Con
bool readHeader;
bool writeHeader;
bool haveOutput;
+ bool initialised;
enum {
NONE, FAILED, SUCCEEDED
} state;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.cpp Fri Sep 20 18:59:30 2013
@@ -36,6 +36,7 @@
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueCursor.h"
#include "qpid/broker/Selector.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/amqp/Filter.h"
@@ -56,15 +57,16 @@ namespace broker {
namespace amqp {
namespace {
-bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
+pn_bytes_t convert(const std::string& s)
{
- pn_data_rewind(capabilities);
- while (pn_data_next(capabilities)) {
- pn_bytes_t c = pn_data_get_symbol(capabilities);
- std::string s(c.start, c.size);
- if (s == name) return true;
- }
- return false;
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
+}
+std::string convert(pn_bytes_t in)
+{
+ return std::string(in.start, in.size);
}
//capabilities
const std::string CREATE_ON_DEMAND("create-on-demand");
@@ -75,40 +77,90 @@ const std::string DIRECT_FILTER("legacy-
const std::string TOPIC_FILTER("legacy-amqp-topic-binding");
const std::string SHARED("shared");
-void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
+void writeCapabilities(pn_data_t* out, const std::vector<std::string>& supported)
{
- pn_data_rewind(in);
- while (pn_data_next(in)) {
- pn_bytes_t c = pn_data_get_symbol(in);
- std::string s(c.start, c.size);
- if (s == DURABLE) {
- if (node->isDurable()) pn_data_put_symbol(out, c);
- } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) {
- pn_data_put_symbol(out, c);
+ if (supported.size() == 1) {
+ pn_data_put_symbol(out, convert(supported.front()));
+ } else if (supported.size() > 1) {
+ pn_data_put_array(out, false, PN_SYMBOL);
+ pn_data_enter(out);
+ for (std::vector<std::string>::const_iterator i = supported.begin(); i != supported.end(); ++i) {
+ pn_data_put_symbol(out, convert(*i));
+ }
+ pn_data_exit(out);
+ }
+}
+
+template <class F>
+void readCapabilities(pn_data_t* data, F f)
+{
+ pn_data_rewind(data);
+ if (pn_data_next(data)) {
+ pn_type_t type = pn_data_type(data);
+ if (type == PN_ARRAY) {
+ pn_data_enter(data);
+ while (pn_data_next(data)) {
+ f(convert(pn_data_get_symbol(data)));
+ }
+ pn_data_exit(data);
+ } else if (type == PN_SYMBOL) {
+ f(convert(pn_data_get_symbol(data)));
+ } else {
+ QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type));
}
}
}
-void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
+void matchCapability(const std::string& name, bool* result, const std::string& s)
{
- pn_data_rewind(in);
- while (pn_data_next(in)) {
- pn_bytes_t c = pn_data_get_symbol(in);
- std::string s(c.start, c.size);
- if (s == DURABLE) {
- if (node->isDurable()) pn_data_put_symbol(out, c);
- } else if (s == SHARED) {
- pn_data_put_symbol(out, c);
- } else if (s == CREATE_ON_DEMAND || s == TOPIC) {
- pn_data_put_symbol(out, c);
- } else if (s == DIRECT_FILTER) {
- if (node->getType() == DirectExchange::typeName) pn_data_put_symbol(out, c);
- } else if (s == TOPIC_FILTER) {
- if (node->getType() == TopicExchange::typeName) pn_data_put_symbol(out, c);
- }
+ if (s == name) *result = true;
+}
+
+bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
+{
+ bool result(false);
+ readCapabilities(capabilities, boost::bind(&matchCapability, name, &result, _1));
+ return result;
+}
+
+void collectQueueCapabilities(boost::shared_ptr<Queue> node, std::vector<std::string>* supported, const std::string& s)
+{
+ if (s == DURABLE) {
+ if (node->isDurable()) supported->push_back(s);
+ } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) {
+ supported->push_back(s);
}
}
+void collectExchangeCapabilities(boost::shared_ptr<Exchange> node, std::vector<std::string>* supported, const std::string& s)
+{
+ if (s == DURABLE) {
+ if (node->isDurable()) supported->push_back(s);
+ } else if (s == SHARED) {
+ supported->push_back(s);
+ } else if (s == CREATE_ON_DEMAND || s == TOPIC) {
+ supported->push_back(s);
+ } else if (s == DIRECT_FILTER) {
+ if (node->getType() == DirectExchange::typeName) supported->push_back(s);
+ } else if (s == TOPIC_FILTER) {
+ if (node->getType() == TopicExchange::typeName) supported->push_back(s);
+ }
+}
+
+void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
+{
+ std::vector<std::string> supported;
+ readCapabilities(in, boost::bind(&collectQueueCapabilities, node, &supported, _1));
+ writeCapabilities(out, supported);
+}
+
+void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
+{
+ std::vector<std::string> supported;
+ readCapabilities(in, boost::bind(&collectExchangeCapabilities, node, &supported, _1));
+ writeCapabilities(out, supported);
+}
+
}
class IncomingToQueue : public DecodingIncoming
@@ -149,6 +201,12 @@ Session::ResolvedNode Session::resolve(c
node.queue = connection.getBroker().getQueues().find(name);
node.topic = connection.getTopics().get(name);
if (node.topic) node.exchange = node.topic->getExchange();
+ if (node.exchange && !node.queue && is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
+ node.properties.read(pn_terminus_properties(terminus));
+ if (!node.properties.getExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) {
+ throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "Exchange of different type already exists");
+ }
+ }
if (!node.queue && !node.exchange) {
if (pn_terminus_is_dynamic(terminus) || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
//is it a queue or an exchange?
@@ -316,10 +374,10 @@ void Session::setupOutgoing(pn_link_t* l
target = targetAddress;
}
-
if (node.queue) {
authorise.outgoing(node.queue);
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, false, node.properties.trackControllingLink()));
+ SubscriptionType type = pn_terminus_get_distribution_mode(source) == PN_DIST_MODE_COPY ? BROWSER : CONSUMER;
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.properties.trackControllingLink()));
q->init();
filter.apply(q);
outgoing[link] = q;
@@ -333,8 +391,12 @@ void Session::setupOutgoing(pn_link_t* l
settings.durable = durable;
settings.autodelete = !durable;
}
+ settings.autoDeleteDelay = pn_terminus_get_timeout(source);
+ if (settings.autoDeleteDelay) {
+ settings.autodelete = true;
+ settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay;
+ }
filter.configure(settings);
- //TODO: populate settings from source details when available from engine
std::stringstream queueName;
if (shared) {
//just use link name (TODO: could allow this to be
@@ -350,9 +412,9 @@ void Session::setupOutgoing(pn_link_t* l
if (!shared) queue->setExclusiveOwner(this);
authorise.outgoing(node.exchange, queue, filter);
filter.bind(node.exchange, queue);
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, !shared, false));
- outgoing[link] = q;
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, CONSUMER, !shared, false));
q->init();
+ outgoing[link] = q;
} else if (node.relay) {
boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, connection.getBroker(), *this, name, target, pn_link_name(link), node.relay));
outgoing[link] = out;
@@ -503,7 +565,6 @@ bool Session::dispatch()
void Session::close()
{
- exclusiveQueues.clear();
for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
i->second->detached();
}
@@ -516,6 +577,7 @@ void Session::close()
for (std::set< boost::shared_ptr<Queue> >::const_iterator i = exclusiveQueues.begin(); i != exclusiveQueues.end(); ++i) {
(*i)->releaseExclusiveOwnership();
}
+ exclusiveQueues.clear();
qpid::sys::Mutex::ScopedLock l(lock);
deleted = true;
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.cpp Fri Sep 20 18:59:30 2013
@@ -47,6 +47,13 @@ bool testProperty(const std::string& k,
else return i->second;
}
+qpid::types::Variant::Map filter(const qpid::types::Variant::Map& properties)
+{
+ qpid::types::Variant::Map filtered = properties;
+ filtered.erase(DURABLE);
+ filtered.erase(EXCHANGE);
+ return filtered;
+}
}
Topic::Topic(Broker& broker, const std::string& n, const qpid::types::Variant::Map& properties)
@@ -60,7 +67,7 @@ Topic::Topic(Broker& broker, const std::
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent != 0) {
topic = _qmf::Topic::shared_ptr(new _qmf::Topic(agent, this, name, exchange->GetManagementObject()->getObjectId(), durable));
- topic->set_properties(policy.asMap());
+ topic->set_properties(filter(properties));
agent->addObject(topic);
}
}
@@ -117,8 +124,12 @@ bool TopicRegistry::deleteObject(Broker&
{
if (type == TOPIC) {
boost::shared_ptr<Topic> topic = remove(name);
- if (topic->isDurable()) broker.getStore().destroy(*topic);
- return true;
+ if (topic) {
+ if (topic->isDurable()) broker.getStore().destroy(*topic);
+ return true;
+ } else {
+ return false;
+ }
} else {
return false;
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Translation.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Translation.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Translation.cpp Fri Sep 20 18:59:30 2013
@@ -27,6 +27,7 @@
#include "qpid/amqp/MessageEncoder.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/types/Variant.h"
+#include "qpid/types/encodings.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include <boost/lexical_cast.hpp>
@@ -38,6 +39,9 @@ namespace {
const std::string EMPTY;
const std::string FORWARD_SLASH("/");
+const std::string TEXT_PLAIN("text/plain");
+const std::string SUBJECT_KEY("qpid.subject");
+const std::string APP_ID("x-amqp-0-10.app-id");
qpid::framing::ReplyTo translate(const std::string address, Broker* broker)
{
@@ -98,8 +102,25 @@ class Properties_0_10 : public qpid::amq
std::string getUserId() const { return messageProperties ? messageProperties->getUserId() : EMPTY; }
bool hasTo() const { return getDestination().size() || hasSubject(); }
std::string getTo() const { return getDestination().size() ? getDestination() : getSubject(); }
- bool hasSubject() const { return deliveryProperties && getDestination().size() && deliveryProperties->hasRoutingKey(); }
- std::string getSubject() const { return deliveryProperties && getDestination().size() ? deliveryProperties->getRoutingKey() : EMPTY; }
+ bool hasSubject() const
+ {
+ if (getDestination().empty()) {
+ return getApplicationProperties().isSet(SUBJECT_KEY);
+ } else {
+ return deliveryProperties && deliveryProperties->hasRoutingKey();
+ }
+ }
+ std::string getSubject() const
+ {
+ if (getDestination().empty()) {
+ //message was sent to default exchange, routing key is the queue name
+ return getApplicationProperties().getAsString(SUBJECT_KEY);
+ } else if (deliveryProperties) {
+ return deliveryProperties->getRoutingKey();
+ } else {
+ return EMPTY;
+ }
+ }
bool hasReplyTo() const { return messageProperties && messageProperties->hasReplyTo(); }
std::string getReplyTo() const { return messageProperties ? translate(messageProperties->getReplyTo()) : EMPTY; }
bool hasCorrelationId() const { return messageProperties && messageProperties->hasCorrelationId(); }
@@ -119,7 +140,7 @@ class Properties_0_10 : public qpid::amq
bool hasReplyToGroupId() const { return false; }
std::string getReplyToGroupId() const { return EMPTY; }
- const qpid::framing::FieldTable& getApplicationProperties() { return messageProperties->getApplicationHeaders(); }
+ const qpid::framing::FieldTable& getApplicationProperties() const { return messageProperties->getApplicationHeaders(); }
Properties_0_10(const qpid::broker::amqp_0_10::MessageTransfer& t) : transfer(t),
messageProperties(transfer.getProperties<qpid::framing::MessageProperties>()),
deliveryProperties(transfer.getProperties<qpid::framing::DeliveryProperties>())
@@ -138,7 +159,6 @@ class Properties_0_10 : public qpid::amq
Translation::Translation(const qpid::broker::Message& m, Broker* b) : original(m), broker(b) {}
-
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer()
{
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> t =
@@ -161,13 +181,38 @@ boost::intrusive_ptr<const qpid::broker:
transfer->getFrames().append(method);
transfer->getFrames().append(header);
- qpid::amqp::CharSequence body = message->getBody();
- content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size);
- transfer->getFrames().append(content);
-
qpid::framing::MessageProperties* props =
transfer->getFrames().getHeaders()->get<qpid::framing::MessageProperties>(true);
- props->setContentLength(body.size);
+
+ if (message->isTypedBody()) {
+ qpid::types::Variant body = message->getTypedBody();
+ std::string& data = content.castBody<qpid::framing::AMQContentBody>()->getData();
+ if (body.getType() == qpid::types::VAR_MAP) {
+ qpid::amqp_0_10::MapCodec::encode(body.asMap(), data);
+ props->setContentType(qpid::amqp_0_10::MapCodec::contentType);
+ } else if (body.getType() == qpid::types::VAR_LIST) {
+ qpid::amqp_0_10::ListCodec::encode(body.asList(), data);
+ props->setContentType(qpid::amqp_0_10::ListCodec::contentType);
+ } else if (body.getType() == qpid::types::VAR_STRING) {
+ data = body.getString();
+ if (body.getEncoding() == qpid::types::encodings::UTF8 || body.getEncoding() == qpid::types::encodings::ASCII) {
+ props->setContentType(TEXT_PLAIN);
+ }
+ } else {
+ qpid::types::Variant::List container;
+ container.push_back(body);
+ qpid::amqp_0_10::ListCodec::encode(container, data);
+ props->setContentType(qpid::amqp_0_10::ListCodec::contentType);
+ }
+ transfer->getFrames().append(content);
+ props->setContentLength(data.size());
+ } else {
+ qpid::amqp::CharSequence body = message->getBody();
+ content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size);
+ transfer->getFrames().append(content);
+
+ props->setContentLength(body.size);
+ }
qpid::amqp::MessageId mid = message->getMessageId();
qpid::framing::Uuid uuid;
@@ -209,13 +254,25 @@ boost::intrusive_ptr<const qpid::broker:
if (ap) {
qpid::amqp::Decoder d(ap.data, ap.size);
qpid::amqp_0_10::translate(d.readMap(), props->getApplicationHeaders());
+ std::string appid = props->getApplicationHeaders().getAsString(APP_ID);
+ if (!appid.empty()) {
+ props->setAppId(appid);
+ }
}
qpid::framing::DeliveryProperties* dp =
transfer->getFrames().getHeaders()->get<qpid::framing::DeliveryProperties>(true);
dp->setPriority(message->getPriority());
if (message->isPersistent()) dp->setDeliveryMode(2);
- if (message->getRoutingKey().size()) dp->setRoutingKey(message->getRoutingKey());
+ if (message->getRoutingKey().size()) {
+ if (message->getRoutingKey().size() > std::numeric_limits<uint8_t>::max()) {
+ //have to truncate routing key as it is specified to be a str8
+ dp->setRoutingKey(message->getRoutingKey().substr(0,std::numeric_limits<uint8_t>::max()));
+ } else {
+ dp->setRoutingKey(message->getRoutingKey());
+ }
+ props->getApplicationHeaders().setString(SUBJECT_KEY, message->getRoutingKey());
+ }
return transfer.get();
} else {
@@ -226,10 +283,11 @@ boost::intrusive_ptr<const qpid::broker:
void Translation::write(OutgoingFromQueue& out)
{
- const Message* message = dynamic_cast<const Message*>(&original.getEncoding());
+ const Message* message = dynamic_cast<const Message*>(original.getPersistentContext().get());
+ //persistent context will contain any newly added annotations
+ if (!message) message = dynamic_cast<const Message*>(&original.getEncoding());
if (message) {
//write annotations
- //TODO: merge in any newly added annotations
qpid::amqp::CharSequence deliveryAnnotations = message->getDeliveryAnnotations();
qpid::amqp::CharSequence messageAnnotations = message->getMessageAnnotations();
if (deliveryAnnotations.size) out.write(deliveryAnnotations.data, deliveryAnnotations.size);
@@ -246,14 +304,40 @@ void Translation::write(OutgoingFromQueu
Properties_0_10 properties(*transfer);
qpid::types::Variant::Map applicationProperties;
qpid::amqp_0_10::translate(properties.getApplicationProperties(), applicationProperties);
- std::string content = transfer->getContent();
- size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content);
- std::vector<char> buffer(size);
- qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
- encoder.writeProperties(properties);
- encoder.writeApplicationProperties(applicationProperties);
- if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA);
- out.write(&buffer[0], encoder.getPosition());
+ if (properties.getContentType() == qpid::amqp_0_10::MapCodec::contentType) {
+ qpid::types::Variant::Map content;
+ qpid::amqp_0_10::MapCodec::decode(transfer->getContent(), content);
+ size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties);
+ size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/
+ size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/;
+ std::vector<char> buffer(size);
+ qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+ encoder.writeProperties(properties);
+ encoder.writeApplicationProperties(applicationProperties);
+ encoder.writeMap(content, &qpid::amqp::message::AMQP_VALUE);
+ out.write(&buffer[0], encoder.getPosition());
+ } else if (properties.getContentType() == qpid::amqp_0_10::ListCodec::contentType) {
+ qpid::types::Variant::List content;
+ qpid::amqp_0_10::ListCodec::decode(transfer->getContent(), content);
+ size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties);
+ size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/
+ size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/;
+ std::vector<char> buffer(size);
+ qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+ encoder.writeProperties(properties);
+ encoder.writeApplicationProperties(applicationProperties);
+ encoder.writeList(content, &qpid::amqp::message::AMQP_VALUE);
+ out.write(&buffer[0], encoder.getPosition());
+ } else {
+ std::string content = transfer->getContent();
+ size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content);
+ std::vector<char> buffer(size);
+ qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+ encoder.writeProperties(properties);
+ encoder.writeApplicationProperties(applicationProperties);
+ if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA);
+ out.write(&buffer[0], encoder.getPosition());
+ }
} else {
QPID_LOG(error, "Could not write message data in AMQP 1.0 format");
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp Fri Sep 20 18:59:30 2013
@@ -174,6 +174,7 @@ void Connection::requestIOProcessing(boo
Connection::~Connection()
{
if (mgmtObject != 0) {
+ mgmtObject->debugStats("destroying");
if (!link)
agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, getUserId(), mgmtObject->get_remoteProperties()));
QPID_LOG_CAT(debug, model, "Delete connection. user:" << getUserId()
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/Connection.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/Connection.cpp Fri Sep 20 18:59:30 2013
@@ -127,7 +127,7 @@ void Connection::open(const ConnectionSe
impl->registerFailureCallback ( failureCallback );
}
-const ConnectionSettings& Connection::getNegotiatedSettings()
+const ConnectionSettings& Connection::getNegotiatedSettings() const
{
if (!isOpen())
throw Exception(QPID_MSG("Connection is not open."));
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Fri Sep 20 18:59:30 2013
@@ -50,6 +50,7 @@ using qpid::sys::Mutex;
namespace {
const std::string OK("OK");
const std::string PLAIN("PLAIN");
+const std::string ANONYMOUS("ANONYMOUS");
const std::string en_US("en_US");
const std::string INVALID_STATE_START("start received in invalid state");
@@ -244,6 +245,7 @@ void ConnectionHandler::start(const Fiel
std::vector<std::string> mechlist;
mechlist.reserve(mechanisms.size());
+
if (mechanism.empty()) {
//mechlist is simply what the server offers
std::transform(mechanisms.begin(), mechanisms.end(), std::back_inserter(mechlist), Array::get<std::string, Array::ValuePtr>);
@@ -273,9 +275,25 @@ void ConnectionHandler::start(const Fiel
proxy.send(body);
}
} else {
- //TODO: verify that desired mechanism and locale are supported
- std::string response = ((char)0) + username + ((char)0) + password;
- proxy.startOk(properties, mechanism, response, locale);
+ bool haveAnonymous(false);
+ bool havePlain(false);
+ for (std::vector<std::string>::const_iterator i = mechlist.begin(); i != mechlist.end(); ++i) {
+ if (*i == ANONYMOUS) {
+ haveAnonymous = true;
+ break;
+ } else if (*i == PLAIN) {
+ havePlain = true;
+ }
+ }
+ if (haveAnonymous && (mechanism.empty() || mechanism.find(ANONYMOUS) != std::string::npos)) {
+ proxy.startOk(properties, ANONYMOUS, username, locale);
+ } else if (havePlain && (mechanism.empty() || mechanism.find(PLAIN) !=std::string::npos)) {
+ std::string response = ((char)0) + username + ((char)0) + password;
+ proxy.startOk(properties, PLAIN, response, locale);
+ } else {
+ if (!mechanism.empty()) throw Exception(QPID_MSG("Desired mechanism(s) not valid: " << mechanism << "; client supports PLAIN or ANONYMOUS, broker supports: " << join(mechlist)));
+ throw Exception(QPID_MSG("No valid mechanism; client supports PLAIN or ANONYMOUS, broker supports: " << join(mechlist)));
+ }
}
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Fri Sep 20 18:59:30 2013
@@ -89,6 +89,7 @@ const std::string NODE("node");
const std::string LINK("link");
const std::string MODE("mode");
const std::string RELIABILITY("reliability");
+const std::string TIMEOUT("timeout");
const std::string NAME("name");
const std::string DURABLE("durable");
const std::string X_DECLARE("x-declare");
@@ -240,8 +241,8 @@ class Subscription : public Exchange, pu
void cancel(qpid::client::AsyncSession& session, const std::string& destination);
private:
const std::string queue;
- const bool reliable;
const bool durable;
+ const bool reliable;
const std::string actualType;
const bool exclusiveQueue;
const bool exclusiveSubscription;
@@ -516,13 +517,25 @@ std::string Subscription::getSubscriptio
Subscription::Subscription(const Address& address, const std::string& type)
: Exchange(address),
queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
- reliable(AddressResolution::is_reliable(address)),
durable(Opt(address)/LINK/DURABLE),
+ //if the link is durable, then assume it is also reliable unless explicitly stated otherwise
+ //if not assume it is unreliable unless explicitly stated otherwise
+ reliable(durable ? !AddressResolution::is_unreliable(address) : AddressResolution::is_reliable(address)),
actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)),
alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str())
{
+ const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value;
+ if (timeout) {
+ if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32());
+ } else if (durable && !(Opt(address)/LINK/RELIABILITY).value) {
+ //if durable but not explicitly reliable, then set a non-zero
+ //default for the autodelete timeout (previously this would
+ //have defaulted to autodelete immediately anyway, so the risk
+ //of the change causing problems is mitigated)
+ queueOptions.setInt("qpid.auto_delete_delay", 15*60);
+ }
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
std::string selector = Opt(address)/LINK/SELECTOR;
@@ -584,7 +597,7 @@ void Subscription::subscribe(qpid::clien
//create subscription queue:
session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
- arg::autoDelete=!reliable, arg::durable=durable,
+ arg::autoDelete=!(durable || reliable), arg::durable=durable,
arg::alternateExchange=alternateExchange,
arg::arguments=queueOptions);
//'default' binding:
@@ -997,6 +1010,7 @@ Verifier::Verifier()
link[NAME] = true;
link[DURABLE] = true;
link[RELIABILITY] = true;
+ link[TIMEOUT] = true;
link[X_SUBSCRIBE] = true;
link[X_DECLARE] = true;
link[X_BINDINGS] = true;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Fri Sep 20 18:59:30 2013
@@ -43,6 +43,7 @@ using qpid::framing::Uuid;
namespace {
const std::string TCP("tcp");
+const std::string COLON(":");
double FOREVER(std::numeric_limits<double>::max());
// Time values in seconds can be specified as integer or floating point values.
@@ -86,9 +87,9 @@ bool expired(const sys::AbsTime& start,
} // namespace
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
- replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1),
+ replaceUrls(false), autoReconnect(false), timeout(FOREVER), limit(-1),
minReconnectInterval(0.001), maxReconnectInterval(2),
- retries(0), reconnectOnLimitExceeded(true)
+ retries(0), reconnectOnLimitExceeded(true), disableAutoDecode(false)
{
setOptions(options);
urls.insert(urls.begin(), url);
@@ -106,7 +107,7 @@ void ConnectionImpl::setOption(const std
{
sys::Mutex::ScopedLock l(lock);
if (name == "reconnect") {
- reconnect = value;
+ autoReconnect = value;
} else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
timeout = timeValue(value);
} else if (name == "reconnect-limit" || name == "reconnect_limit") {
@@ -157,8 +158,10 @@ void ConnectionImpl::setOption(const std
settings.sslCertName = value.asString();
} else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") {
reconnectOnLimitExceeded = value;
- } else if (name == "client-properties") {
+ } else if (name == "client-properties" || name == "client_properties") {
amqp_0_10::translate(value.asMap(), settings.clientProperties);
+ } else if (name == "disable-auto-decode" || name == "disable_auto_decode") {
+ disableAutoDecode = value;
} else {
throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
}
@@ -254,7 +257,7 @@ void ConnectionImpl::open()
void ConnectionImpl::reopen()
{
- if (!reconnect) {
+ if (!autoReconnect) {
throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
}
open();
@@ -265,7 +268,7 @@ void ConnectionImpl::connect(const qpid:
{
QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
- if (!reconnect) {
+ if (!autoReconnect) {
throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
}
if (limit >= 0 && retries++ >= limit) {
@@ -341,9 +344,52 @@ bool ConnectionImpl::backoff()
}
}
+void ConnectionImpl::reconnect(const std::string& u)
+{
+ sys::Mutex::ScopedLock l(lock);
+ try {
+ QPID_LOG(info, "Trying to connect to " << u << "...");
+ Url url(u, settings.protocol.size() ? settings.protocol : TCP);
+ if (url.getUser().size()) settings.username = url.getUser();
+ if (url.getPass().size()) settings.password = url.getPass();
+ connection.open(url, settings);
+ QPID_LOG(info, "Connected to " << u);
+ mergeUrls(connection.getInitialBrokers(), l);
+ if (!resetSessions(l)) throw qpid::messaging::TransportFailure("Could not re-establish sessions");
+ } catch (const qpid::TransportFailure& e) {
+ QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());
+ throw qpid::messaging::TransportFailure(e.what());
+ } catch (const std::exception& e) {
+ QPID_LOG(info, "Error while connecting to " << u << ": " << e.what());
+ throw qpid::messaging::MessagingException(e.what());
+ }
+}
+
+void ConnectionImpl::reconnect()
+{
+ if (!tryConnect()) {
+ throw qpid::messaging::TransportFailure("Could not reconnect");
+ }
+}
+std::string ConnectionImpl::getUrl() const
+{
+ if (isOpen()) {
+ std::stringstream u;
+ u << connection.getNegotiatedSettings().protocol << COLON << connection.getNegotiatedSettings().host << COLON << connection.getNegotiatedSettings().port;
+ return u.str();
+ } else {
+ return std::string();
+ }
+}
+
std::string ConnectionImpl::getAuthenticatedUsername()
{
return connection.getNegotiatedSettings().username;
}
+bool ConnectionImpl::getAutoDecode() const
+{
+ return !disableAutoDecode;
+}
+
}}} // namespace qpid::client::amqp0_10
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org