You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/06 22:27:36 UTC
svn commit: r573359 - in /incubator/qpid/trunk/qpid/cpp: rubygen/templates/
src/ src/qpid/broker/ src/qpid/client/ src/qpid/framing/ src/tests/
Author: gsim
Date: Thu Sep 6 13:27:33 2007
New Revision: 573359
URL: http://svn.apache.org/viewvc?rev=573359&view=rev
Log:
Implementation of execution.result on the client side
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/TypedResult.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp
- copied, changed from r572247, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h
- copied, changed from r572394, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (with props)
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
Modified:
incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h
incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb Thu Sep 6 13:27:33 2007
@@ -12,8 +12,18 @@
@classname="Session"
end
+ def return_type(m)
+ if (m.result)
+ return "TypedResult<qpid::framing::#{m.result.cpptype.ret}>"
+ elsif (not m.responses().empty?)
+ return "Response"
+ else
+ return "Completion"
+ end
+ end
+
def declare_method (m)
- gen "Response #{m.parent.name.lcaps}#{m.name.caps}("
+ gen "#{return_type(m)} #{m.parent.name.lcaps}#{m.name.caps}("
if (m.content())
params=m.signature + ["const MethodContent& content"]
else
@@ -28,7 +38,7 @@
end
def define_method (m)
- gen "Response Session::#{m.parent.name.lcaps}#{m.name.caps}("
+ gen "#{return_type(m)} Session::#{m.parent.name.lcaps}#{m.name.caps}("
if (m.content())
params=m.signature + ["const MethodContent& content"]
else
@@ -37,19 +47,15 @@
indent { gen params.join(",\n") }
gen "){\n\n"
indent (2) {
- gen "return impl->send(#{m.body_name}("
+ gen "return #{return_type(m)}(impl()->send(#{m.body_name}("
params = ["version"] + m.param_names
gen params.join(", ")
other_params=[]
if (m.content())
- other_params << "content"
- end
- if m.responses().empty?
- other_params << "false"
- else
- other_params << "true"
+ gen "), content), impl());\n"
+ else
+ gen ")), impl());\n"
end
- gen "), #{other_params.join(", ")});\n"
}
gen "}\n\n"
end
@@ -65,11 +71,13 @@
gen <<EOS
#include <sstream>
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/amqp_structs.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/MethodContent.h"
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Response.h"
-#include "qpid/client/SessionCore.h"
+#include "qpid/client/ScopedAssociation.h"
+#include "qpid/client/TypedResult.h"
namespace qpid {
namespace client {
@@ -81,16 +89,20 @@
using framing::SequenceNumberSet;
class #{@classname} {
- ConnectionImpl::shared_ptr parent;
- SessionCore::shared_ptr impl;
+ ScopedAssociation::shared_ptr assoc;
framing::ProtocolVersion version;
+
+ SessionCore::shared_ptr impl();
+
public:
- #{@classname}(ConnectionImpl::shared_ptr, SessionCore::shared_ptr);
- ~#{@classname}();
+ #{@classname}();
+ #{@classname}(ScopedAssociation::shared_ptr);
- framing::FrameSet::shared_ptr get() { return impl->get(); }
- void setSynchronous(bool sync) { impl->setSync(sync); }
+ framing::FrameSet::shared_ptr get() { return impl()->get(); }
+ void setSynchronous(bool sync) { impl()->setSync(sync); }
void close();
+ Execution& execution() { return impl()->getExecution(); }
+
EOS
indent { @amqp.classes.each { |c| declare_class(c) if !excludes.include?(c.name) } }
gen <<EOS
@@ -112,24 +124,18 @@
namespace qpid {
namespace client {
-#{@classname}::#{@classname}(ConnectionImpl::shared_ptr _parent, SessionCore::shared_ptr _impl) : parent(_parent), impl(_impl) {}
+#{@classname}::#{@classname}() {}
+#{@classname}::#{@classname}(ScopedAssociation::shared_ptr _assoc) : assoc(_assoc) {}
-#{@classname}::~#{@classname}()
+SessionCore::shared_ptr #{@classname}::impl()
{
- impl->stop();
- if (parent) {
- parent->released(impl);
- parent.reset();
- }
+ if (!assoc) throw Exception("Uninitialised session");
+ return assoc->session;
}
void #{@classname}::close()
{
- impl->close();
- if (parent) {
- parent->released(impl);
- parent.reset();
- }
+ impl()->close();
}
EOS
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb Thu Sep 6 13:27:33 2007
@@ -117,19 +117,15 @@
end
def methodbody_extra_defs(s)
- if (s.content)
- content = "true"
- else
- content = "false"
- end
-
gen <<EOS
using AMQMethodBody::accept;
void accept(MethodBodyConstVisitor& v) const { v.visit(*this); }
inline ClassId amqpClassId() const { return CLASS_ID; }
inline MethodId amqpMethodId() const { return METHOD_ID; }
- inline bool isContentBearing() const { return #{content}; }
+ inline bool isContentBearing() const { return #{s.content ? "true" : "false" }; }
+ inline bool resultExpected() const { return #{s.result ? "true" : "false"}; }
+ inline bool responseExpected() const { return #{s.responses().empty? ? "false" : "true"}; }
EOS
end
@@ -201,7 +197,7 @@
#as result structs have types that are only unique to the
#class, they have a class dependent qualifier added to them
#(this is inline with current python code but a formal
- #solution is expected from the WG
+ #solution is expected from the WG)
indent { genl "static const uint16_t TYPE = #{s.type_} + #{s.parent.parent.parent.index} * 256;" }
else
indent { genl "static const uint16_t TYPE = #{s.type_};" }
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Sep 6 13:27:33 2007
@@ -98,6 +98,7 @@
libqpidcommon_la_SOURCES = \
$(rgen_common_cpp) \
$(platform_src) \
+ qpid/framing/AccumulatedAck.cpp \
qpid/framing/AMQBody.cpp \
qpid/framing/AMQMethodBody.cpp \
qpid/framing/AMQContentBody.cpp \
@@ -155,7 +156,6 @@
libqpidbroker_la_LIBADD = libqpidcommon.la -lboost_iostreams
libqpidbroker_la_SOURCES = \
- qpid/broker/AccumulatedAck.cpp \
qpid/broker/Broker.cpp \
qpid/broker/BrokerAdapter.cpp \
qpid/broker/BrokerSingleton.cpp \
@@ -218,14 +218,13 @@
qpid/client/ExecutionHandler.cpp \
qpid/client/FutureCompletion.cpp \
qpid/client/FutureResponse.cpp \
- qpid/client/FutureFactory.cpp \
+ qpid/client/FutureResult.cpp \
qpid/client/SessionCore.cpp \
qpid/client/StateManager.cpp
nobase_include_HEADERS = \
$(platform_hdr) \
- qpid/broker/AccumulatedAck.h \
qpid/broker/BrokerExchange.h \
qpid/broker/BrokerQueue.h \
qpid/broker/Consumer.h \
@@ -295,6 +294,7 @@
qpid/client/Connection.h \
qpid/client/ConnectionImpl.h \
qpid/client/Connector.h \
+ qpid/client/Completion.h \
qpid/client/MessageListener.h \
qpid/client/BlockingQueue.h \
qpid/client/Correlator.h \
@@ -302,13 +302,18 @@
qpid/client/ChannelHandler.h \
qpid/client/ChainableFrameHandler.h \
qpid/client/ConnectionHandler.h \
+ qpid/client/Execution.h \
qpid/client/ExecutionHandler.h \
+ qpid/client/Future.h \
qpid/client/FutureCompletion.h \
qpid/client/FutureResponse.h \
- qpid/client/FutureFactory.h \
+ qpid/client/FutureResult.h \
qpid/client/Response.h \
+ qpid/client/ScopedAssociation.h \
qpid/client/SessionCore.h \
qpid/client/StateManager.h \
+ qpid/client/TypedResult.h \
+ qpid/framing/AccumulatedAck.h \
qpid/framing/AMQBody.h \
qpid/framing/AMQContentBody.h \
qpid/framing/AMQDataBlock.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Thu Sep 6 13:27:33 2007
@@ -171,7 +171,6 @@
queue->getSettings(),
queue->getMessageCount(),
queue->getConsumerCount());
-
}
void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Thu Sep 6 13:27:33 2007
@@ -65,7 +65,7 @@
return id > tag;
}
-bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
+bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const{
return range->covers(id);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Thu Sep 6 13:27:33 2007
@@ -25,7 +25,7 @@
#include <list>
#include <vector>
#include <ostream>
-#include "AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
#include "BrokerQueue.h"
#include "Consumer.h"
#include "DeliveryId.h"
@@ -56,7 +56,7 @@
bool matches(DeliveryId tag) const;
bool matchOrAfter(DeliveryId tag) const;
bool after(DeliveryId tag) const;
- bool coveredBy(const AccumulatedAck* const range) const;
+ bool coveredBy(const framing::AccumulatedAck* const range) const;
void requeue() const;
void release();
void reject();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp Thu Sep 6 13:27:33 2007
@@ -26,7 +26,7 @@
using std::mem_fun_ref;
using namespace qpid::broker;
-DtxAck::DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
+DtxAck::DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
{
remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()),
not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h Thu Sep 6 13:27:33 2007
@@ -24,7 +24,7 @@
#include <algorithm>
#include <functional>
#include <list>
-#include "AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
#include "DeliveryRecord.h"
#include "TxOp.h"
@@ -34,7 +34,7 @@
std::list<DeliveryRecord> pending;
public:
- DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+ DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h Thu Sep 6 13:27:33 2007
@@ -22,7 +22,6 @@
*
*/
-#include "AccumulatedAck.h"
#include "Consumer.h"
#include "Deliverable.h"
#include "DeliveryAdapter.h"
@@ -35,6 +34,7 @@
#include "TxBuffer.h"
#include "SemanticHandler.h" // FIXME aconway 2007-08-31: remove
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AccumulatedAck.h"
#include "qpid/shared_ptr.h"
#include <boost/ptr_container/ptr_vector.hpp>
@@ -116,7 +116,7 @@
TxBuffer::shared_ptr txBuffer;
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
- AccumulatedAck accumulatedAck;
+ framing::AccumulatedAck accumulatedAck;
bool opened;
bool flowActive;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp Thu Sep 6 13:27:33 2007
@@ -25,6 +25,7 @@
using std::bind2nd;
using std::mem_fun_ref;
using namespace qpid::broker;
+using qpid::framing::AccumulatedAck;
TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) :
acked(_acked), unacked(_unacked){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h Thu Sep 6 13:27:33 2007
@@ -24,7 +24,7 @@
#include <algorithm>
#include <functional>
#include <list>
-#include "AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
#include "DeliveryRecord.h"
#include "TxOp.h"
@@ -35,7 +35,7 @@
* transactional channel.
*/
class TxAck : public TxOp{
- AccumulatedAck& acked;
+ framing::AccumulatedAck& acked;
std::list<DeliveryRecord>& unacked;
public:
@@ -44,7 +44,7 @@
* acks received
* @param unacked the record of delivered messages
*/
- TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+ TxAck(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Thu Sep 6 13:27:33 2007
@@ -56,7 +56,7 @@
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
prefetch(_prefetch), transactional(_transactional), running(false),
- uniqueId(true)/*could eventually be the session id*/, nameCounter(0)
+ uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false)
{
}
@@ -65,26 +65,25 @@
join();
}
-void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s)
+void Channel::open(const Session& s)
{
+ Mutex::ScopedLock l(lock);
if (isOpen())
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
-
- connection = c;
- sessionCore = s;
- session = auto_ptr<Session>(new Session(c, s));
+ active = true;
+ session = s;
}
bool Channel::isOpen() const {
Mutex::ScopedLock l(lock);
- return connection;
+ return active;
}
void Channel::setQos() {
- session->basicQos(0, getPrefetch(), false);
+ session.basicQos(0, getPrefetch(), false);
if(isTransactional()) {
//I think this is wrong! should only send TxSelect once...
- session->txSelect();
+ session.txSelect();
}
}
@@ -95,13 +94,13 @@
void Channel::declareExchange(Exchange& exchange, bool synch){
FieldTable args;
- ScopedSync s(*session, synch);
- session->exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args);
+ ScopedSync s(session, synch);
+ session.exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args);
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
- ScopedSync s(*session, synch);
- session->exchangeDelete(0, exchange.getName(), false);
+ ScopedSync s(session, synch);
+ session.exchangeDelete(0, exchange.getName(), false);
}
void Channel::declareQueue(Queue& queue, bool synch){
@@ -112,30 +111,30 @@
}
FieldTable args;
- ScopedSync s(*session, synch);
- session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(),
+ ScopedSync s(session, synch);
+ session.queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(),
queue.isExclusive(), queue.isAutoDelete(), args);
}
void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
- ScopedSync s(*session, synch);
- session->queueDelete(0, queue.getName(), ifunused, ifempty);
+ ScopedSync s(session, synch);
+ session.queueDelete(0, queue.getName(), ifunused, ifempty);
}
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- ScopedSync s(*session, synch);
- session->queueBind(0, q, e, key, args);
+ ScopedSync s(session, synch);
+ session.queueBind(0, q, e, key, args);
}
void Channel::commit(){
- session->txCommit();
+ session.txCommit();
}
void Channel::rollback(){
- session->txRollback();
+ session.txRollback();
}
void Channel::consume(
@@ -155,8 +154,8 @@
c.ackMode = ackMode;
c.lastDeliveryTag = 0;
}
- ScopedSync s(*session, synch);
- session->basicConsume(0, queue.getName(), tag, noLocal,
+ ScopedSync s(session, synch);
+ session.basicConsume(0, queue.getName(), tag, noLocal,
ackMode == NO_ACK, false, !synch,
fields ? *fields : FieldTable());
}
@@ -171,13 +170,13 @@
c = i->second;
consumers.erase(i);
}
- ScopedSync s(*session, synch);
- session->basicCancel(tag);
+ ScopedSync s(session, synch);
+ session.basicCancel(tag);
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- Response response = session->basicGet(0, queue.getName(), ackMode == NO_ACK);
- sessionCore->flush();//TODO: need to expose the ability to request completion info through session
+ Response response = session.basicGet(0, queue.getName(), ackMode == NO_ACK);
+ session.execution().sendFlushRequest();
if (response.isA<BasicGetEmptyBody>()) {
return false;
} else {
@@ -194,19 +193,15 @@
const string e = exchange.getName();
string key = routingKey;
- session->basicPublish(0, e, key, mandatory, immediate, msg);
+ session.basicPublish(0, e, key, mandatory, immediate, msg);
}
void Channel::close()
{
- session->close();
+ session.close();
{
Mutex::ScopedLock l(lock);
- if (connection);
- {
- sessionCore.reset();
- connection.reset();
- }
+ active = false;
}
stop();
}
@@ -232,7 +227,7 @@
void Channel::run() {
try {
while (true) {
- FrameSet::shared_ptr content = session->get();
+ FrameSet::shared_ptr content = session.get();
//need to dispatch this to the relevant listener:
if (content->isA<BasicDeliverBody>()) {
ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Thu Sep 6 13:27:33 2007
@@ -79,18 +79,17 @@
bool running;
ConsumerMap consumers;
- ConnectionImpl::shared_ptr connection;
- std::auto_ptr<Session> session;
- SessionCore::shared_ptr sessionCore;
+ Session session;
framing::ChannelId channelId;
BlockingQueue<framing::FrameSet::shared_ptr> gets;
framing::Uuid uniqueId;
uint32_t nameCounter;
+ bool active;
void stop();
void setQos();
- void open(ConnectionImpl::shared_ptr, SessionCore::shared_ptr);
+ void open(const Session& session);
void closeInternal();
void join();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp Thu Sep 6 13:27:33 2007
@@ -25,6 +25,7 @@
#include "Connection.h"
#include "ClientChannel.h"
#include "ClientMessage.h"
+#include "ScopedAssociation.h"
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
@@ -66,18 +67,15 @@
}
void Connection::openChannel(Channel& channel) {
- ChannelId id = ++channelIdCounter;
- SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
- impl->allocated(session);
- channel.open(impl, session);
- session->open();
+ channel.open(newSession());
}
Session Connection::newSession() {
ChannelId id = ++channelIdCounter;
SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
- impl->allocated(session);
- return Session(impl, session);
+ ScopedAssociation::shared_ptr assoc(new ScopedAssociation(session, impl));
+ session->open();
+ return Session(assoc);
}
void Connection::close()
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h Thu Sep 6 13:27:33 2007
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _Completion_
+#define _Completion_
+
+#include <boost/shared_ptr.hpp>
+#include "Future.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+class Completion
+{
+protected:
+ Future future;
+ SessionCore::shared_ptr session;
+
+public:
+ Completion(Future f, SessionCore::shared_ptr s) : future(f), session(s) {}
+
+ void sync()
+ {
+ future.sync(*session);
+ }
+
+ bool isComplete() {
+ return future.isComplete();
+ }
+};
+
+}}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp Thu Sep 6 13:27:33 2007
@@ -20,45 +20,101 @@
*/
#include "CompletionTracker.h"
+#include <algorithm>
using qpid::client::CompletionTracker;
using namespace qpid::framing;
using namespace boost;
+namespace
+{
+const std::string empty;
+}
+
CompletionTracker::CompletionTracker() {}
CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {}
+void CompletionTracker::close()
+{
+ sys::Mutex::ScopedLock l(lock);
+ while (!listeners.empty()) {
+ Record r(listeners.front());
+ {
+ sys::Mutex::ScopedUnlock u(lock);
+ r.completed();
+ }
+ listeners.pop_front();
+ }
+}
void CompletionTracker::completed(const SequenceNumber& _mark)
{
sys::Mutex::ScopedLock l(lock);
mark = _mark;
- while (!listeners.empty() && !(listeners.front().first > mark)) {
- Listener f(listeners.front().second);
+ while (!listeners.empty() && !(listeners.front().id > mark)) {
+ Record r(listeners.front());
{
sys::Mutex::ScopedUnlock u(lock);
- f();
+ r.completed();
}
- listeners.pop();
+ listeners.pop_front();
+ }
+}
+
+void CompletionTracker::received(const SequenceNumber& id, const std::string& result)
+{
+ sys::Mutex::ScopedLock l(lock);
+ Listeners::iterator i = seek(id);
+ if (i != listeners.end() && i->id == id) {
+ i->received(result);
+ listeners.erase(i);
}
}
-void CompletionTracker::listen(const SequenceNumber& point, Listener listener)
+void CompletionTracker::listenForCompletion(const SequenceNumber& point, CompletionListener listener)
{
- if (!add(point, listener)) {
+ if (!add(Record(point, listener))) {
listener();
}
}
-bool CompletionTracker::add(const SequenceNumber& point, Listener listener)
+void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListener listener)
+{
+ if (!add(Record(point, listener))) {
+ listener(empty);
+ }
+}
+
+bool CompletionTracker::add(const Record& record)
{
sys::Mutex::ScopedLock l(lock);
- if (point < mark) {
+ if (record.id < mark) {
return false;
} else {
- listeners.push(make_pair(point, listener));
+ //insert at the correct position
+ Listeners::iterator i = seek(record.id);
+ if (i == listeners.end()) i = listeners.begin();
+ listeners.insert(i, record);
+
return true;
}
}
+CompletionTracker::Listeners::iterator CompletionTracker::seek(const framing::SequenceNumber& point)
+{
+ Listeners::iterator i = listeners.begin();
+ while (i != listeners.end() && i->id < point) i++;
+ return i;
+}
+
+void CompletionTracker::Record::completed()
+{
+ if (f) f();
+ else if(g) g(empty);//won't get a result if command is now complete
+}
+
+void CompletionTracker::Record::received(const std::string& result)
+{
+ if (g) g(result);
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h Thu Sep 6 13:27:33 2007
@@ -19,7 +19,7 @@
*
*/
-#include <queue>
+#include <list>
#include <boost/function.hpp>
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/SequenceNumber.h"
@@ -34,19 +34,40 @@
class CompletionTracker
{
public:
- typedef boost::function<void()> Listener;
+ //typedef boost::function<void()> CompletionListener;
+ typedef boost::function0<void> CompletionListener;
+ typedef boost::function<void(const std::string&)> ResultListener;
CompletionTracker();
CompletionTracker(const framing::SequenceNumber& mark);
void completed(const framing::SequenceNumber& mark);
- void listen(const framing::SequenceNumber& point, Listener l);
+ void received(const framing::SequenceNumber& id, const std::string& result);
+ void listenForCompletion(const framing::SequenceNumber& point, CompletionListener l);
+ void listenForResult(const framing::SequenceNumber& point, ResultListener l);
+ void close();
private:
+ struct Record
+ {
+ framing::SequenceNumber id;
+ CompletionListener f;
+ ResultListener g;
+
+ Record(const framing::SequenceNumber& _id, CompletionListener l) : id(_id), f(l) {}
+ Record(const framing::SequenceNumber& _id, ResultListener l) : id(_id), g(l) {}
+ void completed();
+ void received(const std::string& result);
+
+ };
+
+ typedef std::list<Record> Listeners;
+
sys::Mutex lock;
framing::SequenceNumber mark;
- std::queue< std::pair<framing::SequenceNumber, Listener> > listeners;
+ Listeners listeners;
- bool add(const framing::SequenceNumber& point, Listener l);
+ bool add(const Record& r);
+ Listeners::iterator seek(const framing::SequenceNumber&);
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Thu Sep 6 13:27:33 2007
@@ -60,11 +60,11 @@
void ConnectionImpl::incoming(framing::AMQFrame& frame)
{
uint16_t id = frame.getChannel();
- SessionCore::shared_ptr session = sessions[id];
- if (!session) {
+ SessionMap::iterator i = sessions.find(id);
+ if (i == sessions.end()) {
throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
}
- session->handle(frame);
+ i->second->handle(frame);
}
void ConnectionImpl::open(const std::string& host, int port,
@@ -111,7 +111,8 @@
connector->send(frame);
}
-void ConnectionImpl::shutdown() {
+void ConnectionImpl::shutdown()
+{
//this indicates that the socket to the server has closed
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
i->second->closed(0, "Unexpected socket closure.");
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h Thu Sep 6 13:27:33 2007
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Execution_
+#define _Execution_
+
+#include "qpid/framing/SequenceNumber.h"
+
+namespace qpid {
+namespace client {
+
+class Execution
+{
+public:
+ virtual ~Execution() {}
+ virtual void sendSyncRequest() = 0;
+ virtual void sendFlushRequest() = 0;
+ virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0;
+};
+
+}}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Thu Sep 6 13:27:33 2007
@@ -97,8 +97,7 @@
void ExecutionHandler::flush()
{
- //send completion
- incoming.lwm = incoming.hwm;
+ sendCompletion();
}
void ExecutionHandler::noop()
@@ -106,48 +105,88 @@
//do nothing
}
-void ExecutionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+void ExecutionHandler::result(uint32_t command, const std::string& data)
{
- //TODO: need to signal the result to the appropriate listener
+ completion.received(command, data);
}
void ExecutionHandler::sync()
{
- //TODO: implement (the application is in charge of completion of
- //some commands, so need to track completion for them).
+ //TODO: implement - need to note the mark requested and then
+ //remember to send a response when that point is reached
+}
- //This shouldn't ever need to be called by the server (in my
- //opinion) as the server never needs to synchronise with the
- //clients execution
+void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
+{
+ if (point > outgoing.lwm) {
+ sendFlushRequest();
+ }
}
-void ExecutionHandler::sendFlush()
+void ExecutionHandler::sendFlushRequest()
{
AMQFrame frame(0, ExecutionFlushBody());
- out(frame);
+ out(frame);
}
-void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g)
+void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
{
- //allocate id:
- ++outgoing.hwm;
- //register listeners if necessary:
- if (f) {
- completion.listen(outgoing.hwm, f);
- }
- if (g) {
- correlation.listen(g);
+ if (point > outgoing.lwm) {
+ sendSyncRequest();
+ }
+}
+
+
+void ExecutionHandler::sendSyncRequest()
+{
+ AMQFrame frame(0, ExecutionSyncBody());
+ out(frame);
+}
+
+void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send)
+{
+ if (id > completionStatus.mark) {
+ if (cumulative) {
+ completionStatus.update(completionStatus.mark, id);
+ } else {
+ completionStatus.update(id, id);
+ }
}
+ if (send) {
+ sendCompletion();
+ }
+}
+
+
+void ExecutionHandler::sendCompletion()
+{
+ SequenceNumberSet range;
+ completionStatus.collectRanges(range);
+ AMQFrame frame(0, ExecutionCompleteBody(version, completionStatus.mark.getValue(), range));
+ out(frame);
+}
- AMQFrame frame(0/*id will be filled in be channel handler*/, command);
+SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l)
+{
+ SequenceNumber id = ++outgoing.hwm;
+ if(l) {
+ completion.listenForResult(id, l);
+ }
+ AMQFrame frame(0/*channel will be filled in be channel handler*/, command);
out(frame);
+ return id;
}
-void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data,
- CompletionTracker::Listener f, Correlator::Listener g)
+SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content,
+ CompletionTracker::ResultListener l)
{
- send(command, f, g);
+ SequenceNumber id = send(command, l);
+ sendContent(dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData());
+ return id;
+}
+void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data)
+{
AMQHeaderBody header;
BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers);
header.get<BasicHeaderProperties>(true)->setContentLength(data.size());
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Thu Sep 6 13:27:33 2007
@@ -22,28 +22,34 @@
#define _ExecutionHandler_
#include <queue>
+#include "qpid/framing/AccumulatedAck.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/MethodContent.h"
#include "qpid/framing/SequenceNumber.h"
#include "BlockingQueue.h"
#include "ChainableFrameHandler.h"
#include "CompletionTracker.h"
#include "Correlator.h"
+#include "Execution.h"
namespace qpid {
namespace client {
class ExecutionHandler :
private framing::AMQP_ServerOperations::ExecutionHandler,
- public ChainableFrameHandler
+ public ChainableFrameHandler,
+ public Execution
{
framing::Window incoming;
framing::Window outgoing;
framing::FrameSet::shared_ptr arriving;
Correlator correlation;
CompletionTracker completion;
+ BlockingQueue<framing::FrameSet::shared_ptr> received;
framing::ProtocolVersion version;
uint64_t maxFrameSize;
+ framing::AccumulatedAck completionStatus;
void complete(uint32_t mark, const framing::SequenceNumberSet& range);
void flush();
@@ -51,22 +57,29 @@
void result(uint32_t command, const std::string& data);
void sync();
+ void sendCompletion();
+
+ void sendContent(const framing::BasicHeaderProperties& headers, const std::string& data);
+
public:
- BlockingQueue<framing::FrameSet::shared_ptr> received;
+ typedef CompletionTracker::ResultListener ResultListener;
ExecutionHandler(uint64_t maxFrameSize = 65536);
- void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
-
void handle(framing::AMQFrame& frame);
- void send(const framing::AMQBody& command,
- CompletionTracker::Listener f = CompletionTracker::Listener(),
- Correlator::Listener g = Correlator::Listener());
- void sendContent(const framing::AMQBody& command,
- const framing::BasicHeaderProperties& headers, const std::string& data,
- CompletionTracker::Listener f = CompletionTracker::Listener(),
- Correlator::Listener g = Correlator::Listener());
- void sendFlush();
+ framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener());
+ framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content,
+ ResultListener=ResultListener());
+ void sendSyncRequest();
+ void sendFlushRequest();
+ void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
+ void syncTo(const framing::SequenceNumber& point);
+ void flushTo(const framing::SequenceNumber& point);
+
+ void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
+ Correlator& getCorrelator() { return correlation; }
+ CompletionTracker& getCompletionTracker() { return completion; }
+ BlockingQueue<framing::FrameSet::shared_ptr>& getReceived() { return received; }
};
}}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h Thu Sep 6 13:27:33 2007
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _Future_
+#define _Future_
+
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+#include "qpid/Exception.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/StructHelper.h"
+#include "FutureCompletion.h"
+#include "FutureResponse.h"
+#include "FutureResult.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+class Future : private framing::StructHelper
+{
+ framing::SequenceNumber command;
+ boost::shared_ptr<FutureResponse> response;
+ boost::shared_ptr<FutureResult> result;
+ bool complete;
+
+public:
+ Future() : complete(false) {}
+ Future(const framing::SequenceNumber& id) : command(id), complete(false) {}
+
+ void sync(SessionCore& session)
+ {
+ if (!complete) {
+ FutureCompletion callback;
+ session.getExecution().flushTo(command);
+ session.getExecution().getCompletionTracker().listenForCompletion(
+ command,
+ boost::bind(&FutureCompletion::completed, &callback)
+ );
+ callback.waitForCompletion();
+ session.checkClosed();
+ complete = true;
+ }
+ }
+
+ framing::AMQMethodBody* getResponse(SessionCore& session)
+ {
+ if (response) {
+ session.getExecution().getCompletionTracker().listenForCompletion(
+ command,
+ boost::bind(&FutureResponse::completed, response)
+ );
+ return response->getResponse(session);
+ } else {
+ throw Exception("Response not expected");
+ }
+ }
+
+ template <class T> void decodeResult(T& value, SessionCore& session)
+ {
+ if (result) {
+ decode(value, result->getResult(session));
+ } else {
+ throw Exception("Result not expected");
+ }
+ }
+
+ bool isComplete() {
+ return complete;
+ }
+
+ void setCommandId(const framing::SequenceNumber& id) { command = id; }
+ void setFutureResponse(boost::shared_ptr<FutureResponse> r) { response = r; }
+ void setFutureResult(boost::shared_ptr<FutureResult> r) { result = r; }
+};
+
+}}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp Thu Sep 6 13:27:33 2007
@@ -24,9 +24,9 @@
using namespace qpid::client;
using namespace qpid::sys;
-FutureCompletion::FutureCompletion() : complete(false), closed(false), code(0) {}
+FutureCompletion::FutureCompletion() : complete(false) {}
-bool FutureCompletion::isComplete()
+bool FutureCompletion::isComplete() const
{
Monitor::ScopedLock l(lock);
return complete;
@@ -39,23 +39,10 @@
lock.notifyAll();
}
-void FutureCompletion::waitForCompletion()
+void FutureCompletion::waitForCompletion() const
{
Monitor::ScopedLock l(lock);
- while (!complete && !closed) {
+ while (!complete) {
lock.wait();
}
- if (closed) {
- throw ChannelException(code, text);
- }
-}
-
-void FutureCompletion::close(uint16_t _code, const std::string& _text)
-{
- Monitor::ScopedLock l(lock);
- complete = true;
- closed = true;
- code = _code;
- text = _text;
- lock.notifyAll();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h Thu Sep 6 13:27:33 2007
@@ -31,19 +31,15 @@
class FutureCompletion
{
protected:
- sys::Monitor lock;
+ mutable sys::Monitor lock;
bool complete;
- bool closed;
- uint16_t code;
- std::string text;
public:
FutureCompletion();
virtual ~FutureCompletion(){}
- bool isComplete();
- void waitForCompletion();
+ bool isComplete() const;
+ void waitForCompletion() const;
void completed();
- void close(uint16_t code, const std::string& text);
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp Thu Sep 6 13:27:33 2007
@@ -21,14 +21,17 @@
#include "FutureResponse.h"
+#include "SessionCore.h"
+
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
-AMQMethodBody* FutureResponse::getResponse()
+AMQMethodBody* FutureResponse::getResponse(SessionCore& session)
{
waitForCompletion();
+ session.checkClosed();
return response.get();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h Thu Sep 6 13:27:33 2007
@@ -29,11 +29,13 @@
namespace qpid {
namespace client {
+class SessionCore;
+
class FutureResponse : public FutureCompletion
{
framing::MethodHolder response;
public:
- framing::AMQMethodBody* getResponse();
+ framing::AMQMethodBody* getResponse(SessionCore& session);
void received(framing::AMQMethodBody* response);
};
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp Thu Sep 6 13:27:33 2007
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FutureResult.h"
+
+#include "SessionCore.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+const std::string& FutureResult::getResult(SessionCore& session) const
+{
+ waitForCompletion();
+ session.checkClosed();
+ return result;
+}
+
+void FutureResult::received(const std::string& r)
+{
+ Monitor::ScopedLock l(lock);
+ result = r;
+ complete = true;
+ lock.notifyAll();
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.h?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.h Thu Sep 6 13:27:33 2007
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureResult_
+#define _FutureResult_
+
+#include <string>
+#include "qpid/framing/amqp_framing.h"
+#include "FutureCompletion.h"
+
+namespace qpid {
+namespace client {
+
+class SessionCore;
+
+class FutureResult : public FutureCompletion
+{
+ std::string result;
+public:
+ const std::string& getResult(SessionCore& session) const;
+ void received(const std::string& result);
+};
+
+}}
+
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h Thu Sep 6 13:27:33 2007
@@ -24,33 +24,26 @@
#include <boost/shared_ptr.hpp>
#include "qpid/framing/amqp_framing.h"
-#include "FutureResponse.h"
+#include "Completion.h"
namespace qpid {
namespace client {
-class Response
+class Response : public Completion
{
- boost::shared_ptr<FutureResponse> future;
-
public:
- Response(boost::shared_ptr<FutureResponse> f) : future(f) {}
+ Response(Future f, SessionCore::shared_ptr s) : Completion(f, s) {}
template <class T> T& as()
{
- framing::AMQMethodBody* response(future->getResponse());
- assert(response);
+ framing::AMQMethodBody* response(future.getResponse(*session));
return *boost::polymorphic_downcast<T*>(response);
}
+
template <class T> bool isA()
{
- framing::AMQMethodBody* response(future->getResponse());
+ framing::AMQMethodBody* response(future.getResponse(*session));
return response && response->isA<T>();
- }
-
- void sync()
- {
- return future->waitForCompletion();
}
};
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h Thu Sep 6 13:27:33 2007
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ScopedAssociation_
+#define _ScopedAssociation_
+
+#include "ConnectionImpl.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+struct ScopedAssociation
+{
+ typedef boost::shared_ptr<ScopedAssociation> shared_ptr;
+
+ SessionCore::shared_ptr session;
+ ConnectionImpl::shared_ptr connection;
+
+ ScopedAssociation() {}
+
+ ScopedAssociation(SessionCore::shared_ptr s, ConnectionImpl::shared_ptr c) : session(s), connection(c)
+ {
+ connection->allocated(session);
+ }
+
+ ~ScopedAssociation()
+ {
+ if (connection && session) connection->released(session);
+ }
+};
+
+
+}}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Thu Sep 6 13:27:33 2007
@@ -21,12 +21,15 @@
#include "SessionCore.h"
#include <boost/bind.hpp>
+#include "Future.h"
+#include "FutureResponse.h"
+#include "FutureResult.h"
using namespace qpid::client;
using namespace qpid::framing;
SessionCore::SessionCore(uint16_t _id, boost::shared_ptr<framing::FrameHandler> out,
- uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false)
+ uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false), isClosed(false)
{
l2.out = boost::bind(&FrameHandler::handle, out, _1);
l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1);
@@ -39,47 +42,15 @@
l2.open(id);
}
-void SessionCore::flush()
+ExecutionHandler& SessionCore::getExecution()
{
- l3.sendFlush();
-}
-
-Response SessionCore::send(const AMQMethodBody& method, bool expectResponse)
-{
- boost::shared_ptr<FutureResponse> f(futures.createResponse());
- if (expectResponse) {
- l3.send(method, boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1));
- } else {
- l3.send(method, boost::bind(&FutureResponse::completed, f));
- }
- if (sync) {
- flush();
- f->waitForCompletion();
- }
- return Response(f);
-}
-
-Response SessionCore::send(const AMQMethodBody& method, const MethodContent& content, bool expectResponse)
-{
- //TODO: lots of duplication between these two send methods; refactor
- boost::shared_ptr<FutureResponse> f(futures.createResponse());
- if (expectResponse) {
- l3.sendContent(method, dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(),
- boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1));
- } else {
- l3.sendContent(method, dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(),
- boost::bind(&FutureResponse::completed, f));
- }
- if (sync) {
- flush();
- f->waitForCompletion();
- }
- return Response(f);
+ checkClosed();
+ return l3;
}
FrameSet::shared_ptr SessionCore::get()
{
- return l3.received.pop();
+ return l3.getReceived().pop();
}
void SessionCore::setSync(bool s)
@@ -95,12 +66,13 @@
void SessionCore::close()
{
l2.close();
- l3.received.close();
+ stop();
}
void SessionCore::stop()
{
- l3.received.close();
+ l3.getReceived().close();
+ l3.getCompletionTracker().close();
}
void SessionCore::handle(AMQFrame& frame)
@@ -110,6 +82,46 @@
void SessionCore::closed(uint16_t code, const std::string& text)
{
- l3.received.close();
- futures.close(code, text);
+ stop();
+
+ isClosed = true;
+ reason.code = code;
+ reason.text = text;
+}
+
+void SessionCore::checkClosed()
+{
+ if (isClosed) {
+ throw ChannelException(reason.code, reason.text);
+ }
+}
+
+Future SessionCore::send(const AMQBody& command)
+{
+ Future f;
+ //any result/response listeners must be set before the command is sent
+ if (command.getMethod()->resultExpected()) {
+ boost::shared_ptr<FutureResult> r(new FutureResult());
+ f.setFutureResult(r);
+ //result listener is tied to command id, and is set when that
+ //is allocated by the execution handler, so pass it to send
+ f.setCommandId(l3.send(command, boost::bind(&FutureResult::received, r, _1)));
+ } else {
+ if (command.getMethod()->responseExpected()) {
+ boost::shared_ptr<FutureResponse> r(new FutureResponse());
+ f.setFutureResponse(r);
+ l3.getCorrelator().listen(boost::bind(&FutureResponse::received, r, _1));
+ }
+
+ f.setCommandId(l3.send(command));
+ }
+ return f;
+}
+
+Future SessionCore::send(const AMQBody& command, const MethodContent& content)
+{
+ //content bearing methods don't currently have responses or
+ //results, if that changes should follow procedure for the other
+ //send method impl:
+ return Future(l3.send(command, content));
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Thu Sep 6 13:27:33 2007
@@ -22,6 +22,7 @@
#ifndef _SessionCore_
#define _SessionCore_
+#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/FrameHandler.h"
@@ -29,35 +30,44 @@
#include "qpid/framing/MethodContent.h"
#include "ChannelHandler.h"
#include "ExecutionHandler.h"
-#include "FutureFactory.h"
-#include "Response.h"
namespace qpid {
namespace client {
+class Future;
+
class SessionCore : public framing::FrameHandler
{
+ struct Reason
+ {
+ uint16_t code;
+ std::string text;
+ };
+
ExecutionHandler l3;
ChannelHandler l2;
- FutureFactory futures;
const uint16_t id;
bool sync;
+ bool isClosed;
+ Reason reason;
public:
typedef boost::shared_ptr<SessionCore> shared_ptr;
SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
- Response send(const framing::AMQMethodBody& method, bool expectResponse = false);
- Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false);
framing::FrameSet::shared_ptr get();
uint16_t getId() const { return id; }
void setSync(bool);
bool isSync();
- void flush();
void open();
void close();
void stop();
void closed(uint16_t code, const std::string& text);
+ void checkClosed();
+ ExecutionHandler& getExecution();
+
+ Future send(const framing::AMQBody& command);
+ Future send(const framing::AMQBody& command, const framing::MethodContent& content);
//for incoming frames:
void handle(framing::AMQFrame& frame);
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/TypedResult.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/TypedResult.h?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/TypedResult.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/TypedResult.h Thu Sep 6 13:27:33 2007
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _TypedResult_
+#define _TypedResult_
+
+#include "Completion.h"
+
+namespace qpid {
+namespace client {
+
+template <class T> class TypedResult : public Completion
+{
+ T result;
+ bool decoded;
+
+public:
+ TypedResult(Future f, SessionCore::shared_ptr s) : Completion(f, s), decoded(false) {}
+
+ T& get()
+ {
+ if (!decoded) {
+ future.decodeResult(result, *session);
+ decoded = true;
+ }
+
+ return result;
+ }
+};
+
+}}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/TypedResult.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/TypedResult.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h Thu Sep 6 13:27:33 2007
@@ -50,7 +50,9 @@
virtual MethodId amqpMethodId() const = 0;
virtual ClassId amqpClassId() const = 0;
virtual bool isContentBearing() const = 0;
-
+ virtual bool resultExpected() const = 0;
+ virtual bool responseExpected() const = 0;
+
void invoke(AMQP_ServerOperations&);
bool invoke(Invocable*);
Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp (from r572247, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp&r1=572247&r2=573359&rev=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp Thu Sep 6 13:27:33 2007
@@ -26,9 +26,9 @@
using std::list;
using std::max;
using std::min;
-using namespace qpid::broker;
+using namespace qpid::framing;
-void AccumulatedAck::update(DeliveryId first, DeliveryId last){
+void AccumulatedAck::update(SequenceNumber first, SequenceNumber last){
assert(first <= last);
if (last < mark) return;
@@ -84,7 +84,7 @@
ranges.clear();
}
-bool AccumulatedAck::covers(DeliveryId tag) const{
+bool AccumulatedAck::covers(SequenceNumber tag) const{
if (tag <= mark) return true;
for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
if (i->contains(tag)) return true;
@@ -92,7 +92,15 @@
return false;
}
-bool Range::contains(DeliveryId i) const
+void AccumulatedAck::collectRanges(SequenceNumberSet& set) const
+{
+ for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
+ set.push_back(i->start);
+ set.push_back(i->end);
+ }
+}
+
+bool Range::contains(SequenceNumber i) const
{
return i >= start && i <= end;
}
@@ -113,7 +121,7 @@
}
}
-bool Range::mergeable(const DeliveryId& s) const
+bool Range::mergeable(const SequenceNumber& s) const
{
if (contains(s) || start - s == 1) {
return true;
@@ -122,11 +130,11 @@
}
}
-Range::Range(DeliveryId s, DeliveryId e) : start(s), end(e) {}
+Range::Range(SequenceNumber s, SequenceNumber e) : start(s), end(e) {}
namespace qpid{
-namespace broker{
+namespace framing{
std::ostream& operator<<(std::ostream& out, const Range& r)
{
out << "[" << r.start.getValue() << "-" << r.end.getValue() << "]";
Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h (from r572394, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h&r1=572394&r2=573359&rev=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h Thu Sep 6 13:27:33 2007
@@ -25,21 +25,22 @@
#include <functional>
#include <list>
#include <ostream>
-#include "DeliveryId.h"
+#include "SequenceNumber.h"
+#include "SequenceNumberSet.h"
namespace qpid {
- namespace broker {
+ namespace framing {
struct Range
{
- DeliveryId start;
- DeliveryId end;
+ SequenceNumber start;
+ SequenceNumber end;
- Range(DeliveryId s, DeliveryId e);
- bool contains(DeliveryId i) const;
+ Range(SequenceNumber s, SequenceNumber e);
+ bool contains(SequenceNumber i) const;
bool intersect(const Range& r) const;
bool merge(const Range& r);
- bool mergeable(const DeliveryId& r) const;
+ bool mergeable(const SequenceNumber& r) const;
};
/**
* Keeps an accumulated record of acked messages (by delivery
@@ -50,18 +51,19 @@
/**
* Everything up to this value has been acked.
*/
- DeliveryId mark;
+ SequenceNumber mark;
/**
* List of individually acked messages greater than the
* 'mark'.
*/
std::list<Range> ranges;
- explicit AccumulatedAck(DeliveryId r) : mark(r) {}
- void update(DeliveryId firstTag, DeliveryId lastTag);
+ explicit AccumulatedAck(SequenceNumber r = SequenceNumber()) : mark(r) {}
+ void update(SequenceNumber firstTag, SequenceNumber lastTag);
void consolidate();
void clear();
- bool covers(DeliveryId tag) const;
+ bool covers(SequenceNumber tag) const;
+ void collectRanges(SequenceNumberSet& set) const;
};
std::ostream& operator<<(std::ostream&, const Range&);
std::ostream& operator<<(std::ostream&, const AccumulatedAck&);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h Thu Sep 6 13:27:33 2007
@@ -44,7 +44,7 @@
rbuffer.getRawData(data, size);
}
- template <class T> void decode(T t, std::string& data) {
+ template <class T> void decode(T& t, const std::string& data) {
char* bytes = static_cast<char*>(::alloca(data.length()));
Buffer wbuffer(bytes, data.length());
wbuffer.putRawData(data);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp Thu Sep 6 13:27:33 2007
@@ -19,13 +19,13 @@
* under the License.
*
*/
-#include "qpid/broker/AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
#include "qpid_test_plugin.h"
#include <iostream>
#include <list>
using std::list;
-using namespace qpid::broker;
+using namespace qpid::framing;
class AccumulatedAckTest : public CppUnit::TestCase
{
@@ -44,12 +44,12 @@
public:
bool covers(const AccumulatedAck& ack, int i)
{
- return ack.covers(DeliveryId(i));
+ return ack.covers(SequenceNumber(i));
}
void update(AccumulatedAck& ack, int start, int end)
{
- ack.update(DeliveryId(start), DeliveryId(end));
+ ack.update(SequenceNumber(start), SequenceNumber(end));
}
void testGeneral()
Added: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Thu Sep 6 13:27:33 2007
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <vector>
+#include "qpid_test_plugin.h"
+#include "InProcessBroker.h"
+#include "qpid/client/Session.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+class ClientSessionTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(ClientSessionTest);
+ CPPUNIT_TEST(testQueueQuery);;
+ CPPUNIT_TEST_SUITE_END();
+
+ boost::shared_ptr<Connector> broker;
+ Connection connection;
+ Session session;
+
+ public:
+
+ ClientSessionTest() : broker(new qpid::broker::InProcessBroker()), connection(broker)
+ {
+ connection.open("");
+ session = connection.newSession();
+ }
+
+ void testQueueQuery()
+ {
+ std::string name("my-queue");
+ std::string alternate("amq.fanout");
+ session.queueDeclare(0, name, alternate, false, false, true, true, FieldTable());
+ TypedResult<QueueQueryResult> result = session.queueQuery(name);
+ CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable());
+ CPPUNIT_ASSERT_EQUAL(true, result.get().getExclusive());
+ CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange());
+ }
+
+ void testCompletion()
+ {
+ std::string queue("my-queue");
+ std::string dest("my-dest");
+ session.queueDeclare(0, queue, "", false, false, true, true, FieldTable());
+ //subcribe to the queue with confirm_mode = 1
+ session.messageSubscribe(0, queue, dest, false, 1, 0, false, FieldTable());
+ //publish some messages
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ClientSessionTest);
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h Thu Sep 6 13:27:33 2007
@@ -101,6 +101,7 @@
) : sender(sender_), conversation(conversation_), in(ih) {}
void send(framing::AMQFrame& frame) {
+ //std::cout << (sender == CLIENT ? "C->S: " : "S->C: ") << frame << std::endl;
conversation.push_back(TaggedFrame(sender, frame));
in->received(frame);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Thu Sep 6 13:27:33 2007
@@ -94,10 +94,11 @@
TxPublishTest \
ValueTest \
MessageHandlerTest \
- MessageBuilderTest
+ MessageBuilderTest \
+ ClientSessionTest
#client_unit_tests = \
- ClientChannelTest
+ ClientChannelTest
framing_unit_tests = \
FieldTableTest \