You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/03/03 15:49:11 UTC
svn commit: r633108 - in /incubator/qpid/trunk/qpid/cpp: rubygen/
rubygen/99-0/ src/ src/qpid/broker/ src/qpid/framing/ src/qpid/sys/
src/tests/ xml/
Author: gsim
Date: Mon Mar 3 06:49:06 2008
New Revision: 633108
URL: http://svn.apache.org/viewvc?rev=633108&view=rev
Log:
A further step to final 0-10 spec.
The extra.xml fragment adds class defs for connection in session that are in line with latest spec but use old schema.
The preview codepath (99-0) remains unaltered.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h (with props)
incubator/qpid/trunk/qpid/cpp/src/tests/SequenceSet.cpp (with props)
incubator/qpid/trunk/qpid/cpp/xml/extra.xml (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Proxy.rb
incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Session.rb
incubator/qpid/trunk/qpid/cpp/rubygen/99-0/structs.rb
incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb
incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BodyHolder.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_types.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_types_full.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Proxy.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Proxy.rb?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Proxy.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Proxy.rb Mon Mar 3 06:49:06 2008
@@ -41,6 +41,7 @@
# .h file
h_file(@filename) {
include "qpid/framing/Proxy.h"
+ include "qpid/framing/Array.h"
include "qpid/framing/amqp_types.h"
namespace("qpid::framing") {
cpp_class(@classname, "public Proxy") {
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Session.rb?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Session.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Session.rb Mon Mar 3 06:49:06 2008
@@ -6,7 +6,7 @@
class CppGen
def session_methods
- excludes = ["channel", "connection", "session", "execution"]
+ excludes = ["channel", "connection", "session", "execution", "connection010", "session010"]
gen_methods=@amqp.methods_on(@chassis).reject { |m|
excludes.include? m.parent.name
}
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/99-0/structs.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/99-0/structs.rb?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/99-0/structs.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/99-0/structs.rb Mon Mar 3 06:49:06 2008
@@ -17,6 +17,7 @@
"longlong"=>"LongLong",
"longstr"=>"LongString",
"shortstr"=>"ShortString",
+ "mediumstr"=>"MediumString",
"timestamp"=>"LongLong",
"table"=>"FieldTable",
"content"=>"Content",
@@ -33,7 +34,8 @@
ValueTypes=["octet", "short", "long", "longlong", "timestamp"]
def is_packed(s)
- s.kind_of? AmqpStruct
+ #return true
+ s.kind_of?(AmqpStruct) or s.body_name.include?("010")
end
def execution_header?(s)
@@ -182,12 +184,21 @@
end
end
+ def all_fields_via_accessors(s)
+ s.fields.collect { |f| "get#{f.name.caps}()" }.join(", ")
+ end
+
def methodbody_extra_defs(s)
+ if (s.parent.control?)
+ genl "virtual uint8_t type() const { return 0;/*control segment*/ }"
+ end
+
+
gen <<EOS
typedef #{s.result ? s.result.struct.cpptype.name : 'void'} ResultType;
template <class T> ResultType invoke(T& invocable) const {
- return invocable.#{s.cppname}(#{s.param_names.join ", "});
+ return invocable.#{s.cppname}(#{all_fields_via_accessors(s)});
}
using AMQMethodBody::accept;
@@ -235,6 +246,14 @@
end
def define_packed_field_accessors(s, f, i)
+ if (s.kind_of? AmqpMethod)
+ define_packed_field_accessors_for_method(s, f, i)
+ else
+ define_packed_field_accessors_for_struct(s, f, i)
+ end
+ end
+
+ def define_packed_field_accessors_for_struct(s, f, i)
if (f.domain.type_ == "bit")
genl "void #{s.cppname}::set#{f.name.caps}(#{f.cpptype.param} _#{f.cppname}) {"
indent {
@@ -265,6 +284,37 @@
genl ""
end
+ def define_packed_field_accessors_for_method(s, f, i)
+ if (f.domain.type_ == "bit")
+ genl "void #{s.body_name}::set#{f.name.caps}(#{f.cpptype.param} _#{f.cppname}) {"
+ indent {
+ genl "if (_#{f.cppname}) flags |= #{flag_mask(s, i)};"
+ genl "else flags &= ~#{flag_mask(s, i)};"
+ }
+ genl "}"
+ genl "#{f.cpptype.ret} #{s.body_name}::get#{f.name.caps}() const { return flags & #{flag_mask(s, i)}; }"
+ else
+ genl "void #{s.body_name}::set#{f.name.caps}(#{f.cpptype.param} _#{f.cppname}) {"
+ indent {
+ genl "#{f.cppname} = _#{f.cppname};"
+ genl "flags |= #{flag_mask(s, i)};"
+ }
+ genl "}"
+ genl "#{f.cpptype.ret} #{s.body_name}::get#{f.name.caps}() const { return #{f.cppname}; }"
+ if (f.cpptype.name == "FieldTable")
+ genl "#{f.cpptype.name}& #{s.body_name}::get#{f.name.caps}() {"
+ indent {
+ genl "flags |= #{flag_mask(s, i)};"#treat the field table as having been 'set'
+ genl "return #{f.cppname};"
+ }
+ genl "}"
+ end
+ genl "bool #{s.body_name}::has#{f.name.caps}() const { return flags & #{flag_mask(s, i)}; }"
+ genl "void #{s.body_name}::clear#{f.name.caps}Flag() { flags &= ~#{flag_mask(s, i)}; }"
+ end
+ genl ""
+ end
+
def define_packed_accessors(s)
process_packed_fields(s) { |f, i| define_packed_field_accessors(s, f, i) }
end
@@ -383,7 +433,7 @@
EOS
}
cpp_file("qpid/framing/#{classname}.cpp") {
- if (s.fields.size > 0 || execution_header?(s))
+ if (is_packed(s) || s.fields.size > 0 || execution_header?(s))
buffer = "buffer"
else
buffer = "/*buffer*/"
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb Mon Mar 3 06:49:06 2008
@@ -343,6 +343,10 @@
!["connection", "session", "execution"].include?(name)
end
+ def control?()
+ ["connection010", "session010"].include?(name)
+ end
+
def actions() controls+commands; end
end
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb Mon Mar 3 06:49:06 2008
@@ -142,6 +142,13 @@
def param_names() fields.map { |f| f.cppname }; end
def signature() fields.map { |f| f.signature }; end
def body_name() parent.name.caps+name.caps+"Body"; end
+
+ def cpp_pack_type() # preview
+ CppType.new("uint16_t").code("Short").defval("0");
+ end
+ def pack() # preview
+ "short"
+ end
end
module AmqpHasFields
@@ -182,10 +189,12 @@
"timestamp"=>CppType.new("uint64_t").code("LongLong").defval("0"),
"longstr"=>CppType.new("string").passcref.retcref.code("LongString"),
"shortstr"=>CppType.new("string").passcref.retcref.code("ShortString"),
+ "mediumstr"=>CppType.new("string").passcref.retcref.code("MediumString"),
"table"=>CppType.new("FieldTable").passcref.retcref,
"array"=>CppType.new("Array").passcref.retcref,
"content"=>CppType.new("Content").passcref.retcref,
"rfc1982-long-set"=>CppType.new("SequenceNumberSet").passcref.retcref,
+ "sequence-set"=>CppType.new("SequenceSet").passcref.retcref,
"long-struct"=>CppType.new("string").passcref.retcref.code("LongString"),
"uuid"=>CppType.new("Uuid").passcref.retcref
}
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Mar 3 06:49:06 2008
@@ -14,7 +14,7 @@
if GENERATE
# AMQP_PREVIEW_XML and AMQP_FINAL_XML are defined in ../configure.ac
-amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/cluster.xml
+amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/extra.xml $(top_srcdir)/xml/cluster.xml
amqp_0_10_xml=@AMQP_FINAL_XML@
specs=$(amqp_99_0_xml) $(amqp_0_10_xml)
@@ -130,6 +130,7 @@
qpid/framing/SendContent.cpp \
qpid/framing/SequenceNumber.cpp \
qpid/framing/SequenceNumberSet.cpp \
+ qpid/framing/SequenceSet.cpp \
qpid/framing/Proxy.cpp \
qpid/framing/Uuid.cpp \
qpid/framing/AMQP_HighestVersion.h \
@@ -412,6 +413,7 @@
qpid/framing/SessionState.h \
qpid/framing/SendContent.h \
qpid/framing/SequenceNumber.h \
+ qpid/framing/SequenceSet.h \
qpid/framing/SequenceNumberSet.h \
qpid/framing/SerializeHandler.h \
qpid/framing/StructHelper.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Mon Mar 3 06:49:06 2008
@@ -85,6 +85,8 @@
ExecutionHandler* getExecutionHandler() { BADHANDLER(); }
ConnectionHandler* getConnectionHandler() { BADHANDLER(); }
SessionHandler* getSessionHandler() { BADHANDLER(); }
+ Connection010Handler* getConnection010Handler() { BADHANDLER(); }
+ Session010Handler* getSession010Handler() { BADHANDLER(); }
#undef BADHANDLER
private:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Mon Mar 3 06:49:06 2008
@@ -23,6 +23,7 @@
#include "ConnectionHandler.h"
#include "Connection.h"
#include "qpid/framing/ConnectionStartBody.h"
+#include "qpid/framing/Connection010StartBody.h"
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
@@ -38,11 +39,14 @@
}
void ConnectionHandler::init(const framing::ProtocolInitiation& header) {
+ //need to send out a protocol header back to the client
+ handler->connection.getOutput().initiated(header);
+
FieldTable properties;
string mechanisms(PLAIN);
string locales(en_US);
- handler->serverMode = true;
- handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales);
+ handler->serverMode = true;
+ handler->client.start(properties, mechanisms, locales);
}
void ConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId)
@@ -55,7 +59,7 @@
AMQMethodBody* method=frame.getBody()->getMethod();
try{
if (handler->serverMode) {
- if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method))
+ if (!invoke(static_cast<AMQP_ServerOperations::Connection010Handler&>(*handler.get()), *method))
throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
} else {
if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method))
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h Mon Mar 3 06:49:06 2008
@@ -41,10 +41,10 @@
// TODO aconway 2007-09-18: Rename to ConnectionHandler
class ConnectionHandler : public framing::FrameHandler
{
- struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler,
+ struct Handler : public framing::AMQP_ServerOperations::Connection010Handler,
public framing::AMQP_ClientOperations::ConnectionHandler
{
- framing::AMQP_ClientProxy::Connection client;
+ framing::AMQP_ClientProxy::Connection010 client;
framing::AMQP_ServerProxy::Connection server;
Connection& connection;
bool serverMode;
@@ -55,6 +55,7 @@
const std::string& locale);
void secureOk(const std::string& response);
void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat);
+ void heartbeat() {}
void open(const std::string& virtualHost,
const std::string& capabilities, bool insist);
void close(uint16_t replyCode, const std::string& replyText,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Mar 3 06:49:06 2008
@@ -387,7 +387,7 @@
++end;
}
- for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
+ for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1));
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
@@ -427,16 +427,16 @@
}
}
-void SemanticState::acknowledged(const DeliveryRecord& delivery)
+void SemanticState::adjustFlow(const DeliveryRecord& delivery)
{
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
- get_pointer(i)->acknowledged(delivery);
+ get_pointer(i)->adjustFlow(delivery);
}
}
-void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
+void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery)
{
if (windowing) {
if (msgCredit != 0xFFFFFFFF) msgCredit++;
@@ -637,6 +637,35 @@
{
//TODO: think through properly
parent->outputTasks.activateOutput();
+}
+
+
+void SemanticState::accepted(DeliveryId first, DeliveryId last)
+{
+ AckRange range = findRange(first, last);
+ if (txBuffer.get()) {
+ //in transactional mode, don't dequeue or remove, just
+ //maintain set of acknowledged messages:
+ accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet
+
+ if (dtxBuffer.get()) {
+ //if enlisted in a dtx, remove the relevant slice from
+ //unacked and record it against that transaction
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
+ dtxBuffer->enlist(txAck);
+ }
+ } else {
+ for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
+ unacked.erase(range.start, range.end);
+ }
+}
+
+void SemanticState::completed(DeliveryId first, DeliveryId last)
+{
+ AckRange range = findRange(first, last);
+ for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1));
+ requestDispatch();
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Mon Mar 3 06:49:06 2008
@@ -88,7 +88,7 @@
void addMessageCredit(uint32_t value);
void flush();
void stop();
- void acknowledged(const DeliveryRecord&);
+ void adjustFlow(const DeliveryRecord&);
Queue::shared_ptr getQueue() { return queue; }
bool isBlocked() const { return blocked; }
@@ -122,7 +122,7 @@
void checkDtxTimeout();
ConsumerImpl& find(const std::string& destination);
void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
- void acknowledged(const DeliveryRecord&);
+ void adjustFlow(const DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
void requestDispatch(ConsumerImpl&);
@@ -171,8 +171,6 @@
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
- void ackCumulative(DeliveryId deliveryTag);
- void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
void recover(bool requeue);
void flow(bool active);
DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
@@ -180,8 +178,15 @@
void release(DeliveryId first, DeliveryId last);
void reject(DeliveryId first, DeliveryId last);
void handle(intrusive_ptr<Message> msg);
-
bool doOutput() { return outputTasks.doOutput(); }
+
+ //preview only (completed == ack):
+ void ackCumulative(DeliveryId deliveryTag);
+ void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
+
+ //final 0-10 spec (completed and accepted are distinct):
+ void completed(DeliveryId deliveryTag, DeliveryId endTag);
+ void accepted(DeliveryId deliveryTag, DeliveryId endTag);
};
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Mon Mar 3 06:49:06 2008
@@ -60,17 +60,10 @@
AMQMethodBody* m = f.getBody()->getMethod();
try {
if (!ignoring) {
- if (m &&
- (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) ||
- invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) {
+ if (m && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) {
return;
} else if (session.get()) {
- boost::optional<SequenceNumber> ack=session->received(f);
session->handle(f);
- if (ack)
- peerSession.ack(*ack, SequenceNumberSet());
- } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
- return;
} else {
throw ChannelErrorException(
QPID_MSG("Channel " << channel.get() << " is not open"));
@@ -80,7 +73,8 @@
ignoring=true; // Ignore trailing frames sent by client.
session->detach();
session.reset();
- peerSession.closed(e.code, e.what());
+ //TODO: implement new exception handling mechanism
+ //peerSession.closed(e.code, e.what());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
}catch(const std::exception& e){
@@ -92,7 +86,7 @@
void SessionHandler::handleOut(AMQFrame& f) {
channel.handle(f); // Send it.
if (session->sent(f))
- peerSession.solicitAck();
+ peerSession.flush(false, false, true);
}
void SessionHandler::assertAttached(const char* method) const {
@@ -111,136 +105,123 @@
<< " is already open."));
}
-void SessionHandler::open(uint32_t detachedLifetime) {
- assertClosed("open");
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, detachedLifetime));
- session.reset(state.release());
- peerSession.attached(session->getId(), session->getTimeout());
+void SessionHandler::localSuspend() {
+ if (session.get() && session->isAttached()) {
+ session->detach();
+ connection.broker.getSessionManager().suspend(session);
+ session.reset();
+ }
}
-void SessionHandler::resume(const Uuid& id) {
- assertClosed("resume");
- session = connection.broker.getSessionManager().resume(id);
- session->attach(*this);
- SequenceNumber seq = session->resuming();
- peerSession.attached(session->getId(), session->getTimeout());
- proxy.getSession().ack(seq, SequenceNumberSet());
-}
-void SessionHandler::flow(bool /*active*/) {
- assertAttached("flow");
- // TODO aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException("session.flow");
+ConnectionState& SessionHandler::getConnection() { return connection; }
+const ConnectionState& SessionHandler::getConnection() const { return connection; }
+
+//new methods:
+void SessionHandler::attach(const std::string& name, bool /*force*/)
+{
+ //TODO: need to revise session manager to support resume as well
+ assertClosed("attach");
+ std::auto_ptr<SessionState> state(
+ connection.broker.getSessionManager().open(*this, 0));
+ session.reset(state.release());
+ peerSession.attached(name);
}
-void SessionHandler::flowOk(bool /*active*/) {
- assertAttached("flowOk");
- // TODO aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException("session.flowOk");
+void SessionHandler::attached(const std::string& /*name*/)
+{
+ std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this, 0));
+ session.reset(state.release());
}
-void SessionHandler::close() {
- assertAttached("close");
- QPID_LOG(info, "Received session.close");
- ignoring=false;
- session->detach();
- session.reset();
- peerSession.closed(REPLY_SUCCESS, "ok");
+void SessionHandler::detach(const std::string& name)
+{
+ assertAttached("detach");
+ localSuspend();
+ peerSession.detached(name, 0);
assert(&connection.getChannel(channel.get()) == this);
connection.closeChannel(channel.get());
}
-void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
- QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
+void SessionHandler::detached(const std::string& name, uint8_t code)
+{
ignoring=false;
session->detach();
session.reset();
-}
-
-void SessionHandler::localSuspend() {
- if (session.get() && session->isAttached()) {
- session->detach();
- connection.broker.getSessionManager().suspend(session);
- session.reset();
+ if (code) {
+ //no error
+ } else {
+ //error occured
+ QPID_LOG(warning, "Received session.closed: "<< name << " " << code);
}
}
-void SessionHandler::suspend() {
- assertAttached("suspend");
- localSuspend();
- peerSession.detached();
- assert(&connection.getChannel(channel.get()) == this);
- connection.closeChannel(channel.get());
-}
-
-void SessionHandler::ack(uint32_t cumulativeSeenMark,
- const SequenceNumberSet& /*seenFrameSet*/)
+void SessionHandler::requestTimeout(uint32_t t)
{
- assertAttached("ack");
- if (session->getState() == SessionState::RESUMING) {
- session->receivedAck(cumulativeSeenMark);
- framing::SessionState::Replay replay=session->replay();
- std::for_each(replay.begin(), replay.end(),
- boost::bind(&SessionHandler::handleOut, this, _1));
- }
- else
- session->receivedAck(cumulativeSeenMark);
+ session->setTimeout(t);
+ //proxy.timeout(t);
}
-void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
- // TODO aconway 2007-10-02: may be removed from spec.
- assert(0); throw NotImplementedException("session.high-water-mark");
+void SessionHandler::timeout(uint32_t)
+{
+ //not sure what we need to do on the server for this...
}
-void SessionHandler::solicitAck() {
- assertAttached("solicit-ack");
- peerSession.ack(session->sendingAck(), SequenceNumberSet());
+void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset)
+{
+ if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point");
+
+ session->next = id;
}
-void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
+void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments)
{
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, detachedLifetime));
- session.reset(state.release());
+ if (!commands.empty() || fragments.size()) {
+ throw NotImplementedException("Session resumption not yet supported");
+ }
}
-void SessionHandler::detached()
+void SessionHandler::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/)
{
- connection.broker.getSessionManager().suspend(session);
- session.reset();
+ //don't really care too much about this yet
}
-
-ConnectionState& SessionHandler::getConnection() { return connection; }
-const ConnectionState& SessionHandler::getConnection() const { return connection; }
-
-void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
+void SessionHandler::completed(const framing::SequenceSet& commands, bool timelyReply)
{
- assertAttached("complete");
- session->complete(cumulative, range);
+ session->complete(commands);
+ if (timelyReply) {
+ peerSession.knownCompleted(session->knownCompleted);
+ session->knownCompleted.clear();
+ }
}
-void SessionHandler::flush()
+void SessionHandler::knownCompleted(const framing::SequenceSet& commands)
{
- assertAttached("flush");
- session->flush();
+ session->completed.remove(commands);
}
-void SessionHandler::sync()
+
+void SessionHandler::flush(bool expected, bool confirmed, bool completed)
{
- assertAttached("sync");
- session->sync();
+ if (expected) {
+ peerSession.expected(SequenceSet(session->next), Array());
+ }
+ if (confirmed) {
+ peerSession.confirmed(session->completed, Array());
+ }
+ if (completed) {
+ peerSession.completed(session->completed, true);
+ }
}
-void SessionHandler::noop()
+
+void SessionHandler::sendCompletion()
{
- assertAttached("noop");
- session->noop();
+ peerSession.completed(session->completed, true);
}
-void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+void SessionHandler::gap(const framing::SequenceSet& /*commands*/)
{
- //never actually sent by client at present
+ throw NotImplementedException("gap not yet supported");
}
-
+
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Mon Mar 3 06:49:06 2008
@@ -27,8 +27,10 @@
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/Array.h"
#include "qpid/framing/ChannelHandler.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/SequenceSet.h"
#include <boost/noncopyable.hpp>
@@ -44,9 +46,7 @@
* receives incoming frames, handles session controls and manages the
* association between the channel and a session.
*/
-class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
- public framing::AMQP_ClientOperations::SessionHandler,
- public framing::AMQP_ServerOperations::ExecutionHandler,
+class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
public framing::FrameHandler::InOutHandler,
private boost::noncopyable
{
@@ -69,35 +69,32 @@
// Called by closing connection.
void localSuspend();
void detach() { localSuspend(); }
+ void sendCompletion();
protected:
void handleIn(framing::AMQFrame&);
void handleOut(framing::AMQFrame&);
private:
- /// Session methods
- void open(uint32_t detachedLifetime);
- void flow(bool active);
- void flowOk(bool active);
- void close();
- void closed(uint16_t replyCode, const std::string& replyText);
- void resume(const framing::Uuid& sessionId);
- void suspend();
- void ack(uint32_t cumulativeSeenMark,
- const framing::SequenceNumberSet& seenFrameSet);
- void highWaterMark(uint32_t lastSentMark);
- void solicitAck();
-
- //extra methods required for assuming client role
- void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
- void detached();
-
- //Execution methods:
- void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
- void flush();
- void noop();
- void result(uint32_t command, const std::string& data);
- void sync();
+ //new methods:
+ void attach(const std::string& name, bool force);
+ void attached(const std::string& name);
+ void detach(const std::string& name);
+ void detached(const std::string& name, uint8_t code);
+
+ void requestTimeout(uint32_t t);
+ void timeout(uint32_t t);
+
+ void commandPoint(const framing::SequenceNumber& id, uint64_t offset);
+ void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
+ void confirmed(const framing::SequenceSet& commands,const framing::Array& fragments);
+ void completed(const framing::SequenceSet& commands, bool timelyReply);
+ void knownCompleted(const framing::SequenceSet& commands);
+ void flush(bool expected, bool confirmed, bool completed);
+ void gap(const framing::SequenceSet& commands);
+
+ //hacks for old generator:
+ void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); }
void assertAttached(const char* method) const;
void assertActive(const char* method) const;
@@ -106,7 +103,7 @@
Connection& connection;
framing::ChannelHandler channel;
framing::AMQP_ClientProxy proxy;
- framing::AMQP_ClientProxy::Session peerSession;
+ framing::AMQP_ClientProxy::Session010 peerSession;
bool ignoring;
std::auto_ptr<SessionState> session;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Mar 3 06:49:06 2008
@@ -49,7 +49,7 @@
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
- ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2))
+ ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2))
{
getConnection().outputTasks.addOutputTask(&semanticState);
@@ -170,9 +170,9 @@
void SessionState::handleCommand(framing::AMQMethodBody* method)
{
- SequenceNumber id = incoming.next();
+ SequenceNumber id = next++;
Invoker::Result invocation = invoke(adapter, *method);
- incoming.complete(id);
+ completed.add(id);
if (!invocation.wasHandled()) {
throw NotImplementedException("Not implemented");
@@ -180,7 +180,6 @@
getProxy().getExecution().result(id.getValue(), invocation.getResult());
}
if (method->isSync()) {
- incoming.sync(id);
sendCompletion();
}
//TODO: if window gets too large send unsolicited completion
@@ -190,7 +189,8 @@
{
intrusive_ptr<Message> msg(msgBuilder.getMessage());
if (!msg) {//start of frameset will be indicated by frame flags
- msgBuilder.start(incoming.next());
+ SequenceNumber id = next++;
+ msgBuilder.start(id);
msg = msgBuilder.getMessage();
}
msgBuilder.handle(frame);
@@ -198,9 +198,9 @@
msg->setPublisher(&getConnection());
semanticState.handle(msg);
msgBuilder.end();
- incoming.track(msg);
+ //TODO: may want to hold up execution until async enqueue is complete
+ completed.add(msg->getCommandId());
if (msg->getFrames().getMethod()->isSync()) {
- incoming.sync(msg->getCommandId());
sendCompletion();
}
}
@@ -208,6 +208,8 @@
void SessionState::handle(AMQFrame& frame)
{
+ received(frame);
+
//TODO: make command handling more uniform, regardless of whether
//commands carry content. (For now, assume all single frame
//assmblies are non-content bearing and all content-bearing
@@ -229,38 +231,13 @@
void SessionState::sendCompletion()
{
- SequenceNumber mark = incoming.getMark();
- SequenceNumberSet range = incoming.getRange();
- getProxy().getExecution().complete(mark.getValue(), range);
-}
-
-void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range)
-{
- //record:
- SequenceNumber mark(cumulative);
- if (outgoing.lwm < mark) {
- outgoing.lwm = mark;
- //ack messages:
- semanticState.ackCumulative(mark.getValue());
- }
- range.processRanges(ackOp);
-}
-
-void SessionState::flush()
-{
- incoming.flush();
- sendCompletion();
-}
-
-void SessionState::sync()
-{
- incoming.sync();
- sendCompletion();
+ handler->sendCompletion();
}
-void SessionState::noop()
+void SessionState::complete(const SequenceSet& commands)
{
- incoming.noop();
+ knownCompleted.add(commands);
+ commands.for_each(ackOp);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Mon Mar 3 06:49:06 2008
@@ -25,6 +25,7 @@
#include "qpid/framing/Uuid.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SessionState.h"
+#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Time.h"
@@ -83,6 +84,8 @@
ConnectionState& getConnection();
uint32_t getTimeout() const { return timeout; }
+ void setTimeout(uint32_t t) { timeout = t; }
+
Broker& getBroker() { return broker; }
framing::ProtocolVersion getVersion() const { return version; }
@@ -93,10 +96,7 @@
void handleCommand(framing::AMQMethodBody* method);
void handleContent(framing::AMQFrame& frame);
- void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
- void flush();
- void noop();
- void sync();
+ void complete(const framing::SequenceSet& ranges);
void sendCompletion();
//delivery adapter methods:
@@ -114,6 +114,10 @@
uint32_t ackInterval);
+ framing::SequenceSet completed;
+ framing::SequenceSet knownCompleted;
+ framing::SequenceNumber next;
+
private:
typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
@@ -130,8 +134,6 @@
BrokerAdapter adapter;
MessageBuilder msgBuilder;
- //execution state
- IncomingExecutionContext incoming;
framing::Window outgoing;
RangedOperation ackOp;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp Mon Mar 3 06:49:06 2008
@@ -83,7 +83,7 @@
void AccumulatedAck::consolidate(){}
void AccumulatedAck::clear(){
- mark = 0;//not sure that this is valid when wraparound is a possibility
+ mark = SequenceNumber(0);//not sure that this is valid when wraparound is a possibility
ranges.clear();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BodyHolder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BodyHolder.cpp?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BodyHolder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BodyHolder.cpp Mon Mar 3 06:49:06 2008
@@ -48,6 +48,7 @@
void BodyHolder::decode(uint8_t type, Buffer& buffer, uint32_t size) {
switch(type)
{
+ case 0://CONTROL
case METHOD_BODY: {
ClassId c = buffer.getOctet();
MethodId m = buffer.getOctet();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp Mon Mar 3 06:49:06 2008
@@ -194,6 +194,13 @@
position += len;
}
+void Buffer::putMediumString(const string& s){
+ uint16_t len = s.length();
+ putShort(len);
+ s.copy(data + position, len);
+ position += len;
+}
+
void Buffer::putLongString(const string& s){
uint32_t len = s.length();
putLong(len);
@@ -203,6 +210,13 @@
void Buffer::getShortString(string& s){
uint8_t len = getOctet();
+ checkAvailable(len);
+ s.assign(data + position, len);
+ position += len;
+}
+
+void Buffer::getMediumString(string& s){
+ uint16_t len = getShort();
checkAvailable(len);
s.assign(data + position, len);
position += len;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h Mon Mar 3 06:49:06 2008
@@ -97,8 +97,10 @@
void putUInt(uint64_t);
void putShortString(const string& s);
+ void putMediumString(const string& s);
void putLongString(const string& s);
void getShortString(string& s);
+ void getMediumString(string& s);
void getLongString(string& s);
void getBin128(uint8_t* b);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp Mon Mar 3 06:49:06 2008
@@ -51,9 +51,10 @@
return old;
}
-SequenceNumber SequenceNumber::operator+(uint32_t i) const
+SequenceNumber& SequenceNumber::operator--()
{
- return SequenceNumber(value + i);
+ value = value - 1;
+ return *this;
}
bool SequenceNumber::operator<(const SequenceNumber& other) const
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h Mon Mar 3 06:49:06 2008
@@ -39,7 +39,7 @@
SequenceNumber& operator++();//prefix ++
const SequenceNumber operator++(int);//postfix ++
- SequenceNumber operator+(uint32_t) const;
+ SequenceNumber& operator--();//prefix ++
bool operator==(const SequenceNumber& other) const;
bool operator!=(const SequenceNumber& other) const;
bool operator<(const SequenceNumber& other) const;
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp?rev=633108&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp Mon Mar 3 06:49:06 2008
@@ -0,0 +1,222 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SequenceSet.h"
+
+using namespace qpid::framing;
+using std::max;
+using std::min;
+
+namespace {
+//each range contains 2 numbers, 4 bytes each
+uint16_t RANGE_SIZE = 2 * 4;
+}
+
+void SequenceSet::encode(Buffer& buffer) const
+{
+ buffer.putShort(ranges.size() * RANGE_SIZE);
+ for (Ranges::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
+ i->encode(buffer);
+ }
+}
+
+void SequenceSet::decode(Buffer& buffer)
+{
+ uint16_t size = buffer.getShort();
+ uint16_t count = size / RANGE_SIZE;//number of ranges
+ if (size % RANGE_SIZE) throw FrameErrorException(QPID_MSG("Invalid size for sequence set: " << size));
+
+ for (uint16_t i = 0; i < count; i++) {
+ add(SequenceNumber(buffer.getLong()), SequenceNumber(buffer.getLong()));
+ }
+}
+
+uint32_t SequenceSet::size() const
+{
+ return 2 /*size field*/ + (ranges.size() * RANGE_SIZE);
+}
+
+bool SequenceSet::contains(const SequenceNumber& point) const
+{
+ for (Ranges::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
+ if (i->contains(point)) return true;
+ }
+ return false;
+}
+
+void SequenceSet::add(const SequenceNumber& s)
+{
+ add(s, s);
+}
+
+void SequenceSet::add(const SequenceNumber& start, const SequenceNumber& end)
+{
+ if (start > end) {
+ add(end, start);
+ } else {
+ Range r(start, end);
+ bool merged = false;
+ Ranges::iterator i = ranges.begin();
+ while (i != ranges.end() && !merged && i->start < start) {
+ if (i->merge(r)) merged = true;
+ i++;
+ }
+ if (!merged) {
+ ranges.insert(i, r);
+ }
+ }
+}
+
+void SequenceSet::add(const SequenceSet& set)
+{
+ for (Ranges::const_iterator i = set.ranges.begin(); i != set.ranges.end(); i++) {
+ add(i->start, i->end);
+ }
+}
+
+void SequenceSet::remove(const SequenceSet& set)
+{
+ for (Ranges::const_iterator i = set.ranges.begin(); i != set.ranges.end(); i++) {
+ remove(i->start, i->end);
+ }
+}
+
+void SequenceSet::remove(const SequenceNumber& start, const SequenceNumber& end)
+{
+ if (start > end) {
+ remove(end, start);
+ } else {
+ Ranges::iterator i = ranges.begin();
+ while (i != ranges.end() && i->start < start) {
+ if (start <= i->end) {
+ if (end > i->end) {
+ //i.e. start is within the range pointed to by i, but end is not
+ i->end = (uint32_t)start - 1;
+ } else {
+ //whole of range to be deleted is contained within that pointed to be i
+ if (end == i->end) {
+ //just shrink range pointed to by i
+ i->end = (uint32_t)start - 1;
+ } else {
+ //need to split the range pointed to by i
+ Range r(i->start, (uint32_t)start - 1);
+ i->start = end + 1;
+ ranges.insert(i, r);
+ }
+ return;//no need to go any further
+ }
+ }
+ i++;
+ }
+ Ranges::iterator j = i;
+ while (j != ranges.end() && j->end < end) {
+ j++;
+ }
+ if (j->start <= end){
+ j->start = end + 1;
+ }
+ ranges.erase(i, j);
+ }
+}
+
+void SequenceSet::remove(const SequenceNumber& s)
+{
+ for (Ranges::iterator i = ranges.begin(); i != ranges.end() && s >= i->start; i++) {
+ if (i->start == s) {
+ if (i->start == i->end) {
+ ranges.erase(i);
+ } else {
+ ++(i->start);
+ }
+ } else if (i->end == s) {
+ --(i->end);
+ } else if (i->contains(s)) {
+ //need to split range pointed to by i
+ Range r(i->start, (uint32_t)s - 1);
+ i->start = s + 1;
+ ranges.insert(i, r);
+ }
+ }
+}
+
+bool SequenceSet::empty() const
+{
+ return ranges.empty();
+}
+
+void SequenceSet::clear()
+{
+ return ranges.clear();
+}
+
+bool SequenceSet::Range::contains(SequenceNumber i) const
+{
+ return i >= start && i <= end;
+}
+
+bool SequenceSet::Range::intersects(const Range& r) const
+{
+ return r.contains(start) || r.contains(end) || contains(r.start) || contains(r.end);
+}
+
+bool SequenceSet::Range::merge(const Range& r)
+{
+ if (intersects(r) || mergeable(r.end) || r.mergeable(end)) {
+ start = min(start, r.start);
+ end = max(end, r.end);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool SequenceSet::Range::mergeable(const SequenceNumber& s) const
+{
+ if (contains(s) || start - s == 1) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void SequenceSet::Range::encode(Buffer& buffer) const
+{
+ buffer.putLong(start);
+ buffer.putLong(end);
+}
+
+SequenceSet::Range::Range(SequenceNumber s, SequenceNumber e) : start(s), end(e) {}
+
+namespace qpid{
+namespace framing{
+
+std::ostream& operator<<(std::ostream& out, const SequenceSet& set) {
+ out << "{";
+ for (SequenceSet::Ranges::const_iterator i = set.ranges.begin(); i != set.ranges.end(); i++) {
+ if (i != set.ranges.begin()) out << ", ";
+ out << i->start.getValue() << "-" << i->end.getValue();
+ }
+ out << "}";
+ return out;
+}
+
+}
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h?rev=633108&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h Mon Mar 3 06:49:06 2008
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _framing_SequenceSet_h
+#define _framing_SequenceSet_h
+
+#include <ostream>
+#include <list>
+#include "amqp_types.h"
+#include "Buffer.h"
+#include "SequenceNumber.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace framing {
+
+class SequenceSet
+{
+ struct Range
+ {
+ SequenceNumber start;
+ SequenceNumber end;
+
+ Range(SequenceNumber s, SequenceNumber e);
+ bool contains(SequenceNumber i) const;
+ bool intersects(const Range& r) const;
+ bool merge(const Range& r);
+ bool mergeable(const SequenceNumber& r) const;
+ void encode(Buffer& buffer) const;
+ };
+
+ typedef std::list<Range> Ranges;
+ Ranges ranges;
+
+public:
+ SequenceSet() {}
+ SequenceSet(const SequenceNumber& s) { add(s); }
+
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer);
+ uint32_t size() const;
+
+ bool contains(const SequenceNumber& s) const;
+ void add(const SequenceNumber& s);
+ void add(const SequenceNumber& start, const SequenceNumber& end);
+ void add(const SequenceSet& set);
+ void remove(const SequenceNumber& s);
+ void remove(const SequenceNumber& start, const SequenceNumber& end);
+ void remove(const SequenceSet& set);
+
+ void clear();
+ bool empty() const;
+
+ template <class T>
+ void for_each(T& t) const
+ {
+ for (Ranges::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
+ t(i->start, i->end);
+ }
+ }
+
+ friend std::ostream& operator<<(std::ostream&, const SequenceSet&);
+};
+
+
+}} // namespace qpid::framing
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_types.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_types.h?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_types.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_types.h Mon Mar 3 06:49:06 2008
@@ -65,6 +65,7 @@
class FramingContent;
class FieldTable;
class SequenceNumberSet;
+class SequenceSet;
class Uuid;
}} // namespace qpid::framing
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_types_full.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_types_full.h?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_types_full.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_types_full.h Mon Mar 3 06:49:06 2008
@@ -34,6 +34,7 @@
#include "FramingContent.h"
#include "FieldTable.h"
#include "SequenceNumberSet.h"
+#include "SequenceSet.h"
#include "Uuid.h"
#endif /*!_framing_amqp_types_decl_h*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Mon Mar 3 06:49:06 2008
@@ -94,7 +94,7 @@
std::queue<framing::AMQFrame> frameQueue;
Mutex frameQueueLock;
bool frameQueueClosed;
- bool initiated;
+ bool isInitiated;
bool readError;
std::string identifier;
bool isClient;
@@ -105,7 +105,7 @@
AsynchIOHandler() :
inputHandler(0),
frameQueueClosed(false),
- initiated(false),
+ isInitiated(false),
readError(false),
isClient(false)
{}
@@ -128,6 +128,8 @@
void send(framing::AMQFrame&);
void close();
void activateOutput();
+ void initiated(const framing::ProtocolInitiation&);
+
// Input side
void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
@@ -259,13 +261,18 @@
aio->notifyPendingWrite();
}
+void AsynchIOHandler::initiated(const framing::ProtocolInitiation& pi)
+{
+ write(pi);
+}
+
// Input side
void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
if (readError) {
return;
}
framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
- if(initiated){
+ if(isInitiated){
framing::AMQFrame frame;
try{
while(frame.decode(in)) {
@@ -282,7 +289,7 @@
if(protocolInit.decode(in)){
QPID_LOG(debug, "INIT [" << identifier << "]");
inputHandler->initiated(protocolInit);
- initiated = true;
+ isInitiated = true;
}
}
// TODO: unreading needs to go away, and when we can cope
@@ -324,10 +331,10 @@
}
void AsynchIOHandler::idle(AsynchIO&){
- if (isClient && !initiated) {
+ if (isClient && !isInitiated) {
//get & write protocol header from upper layers
write(inputHandler->getInitiation());
- initiated = true;
+ isInitiated = true;
return;
}
ScopedLock<Mutex> l(frameQueueLock);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h Mon Mar 3 06:49:06 2008
@@ -22,6 +22,7 @@
#define _ConnectionOutputHandler_
#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/InitiationHandler.h"
#include "OutputControl.h"
namespace qpid {
@@ -30,7 +31,7 @@
/**
* Provides the output handler associated with a connection.
*/
-class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl
+class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl, public framing::InitiationHandler
{
public:
virtual void close() = 0;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=633108&r1=633107&r2=633108&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Mon Mar 3 06:49:06 2008
@@ -38,6 +38,7 @@
InlineVector.cpp \
ISList.cpp IList.cpp \
ClientSessionTest.cpp \
+ SequenceSet.cpp \
serialize.cpp \
ProxyTemplate.cpp apply.cpp
# FIXME aconway 2008-02-20: removed RefCountedMap.cpp due to valgrind error.
Added: incubator/qpid/trunk/qpid/cpp/src/tests/SequenceSet.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/SequenceSet.cpp?rev=633108&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/SequenceSet.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/SequenceSet.cpp Mon Mar 3 06:49:06 2008
@@ -0,0 +1,93 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/framing/SequenceSet.h"
+#include "unit_test.h"
+
+QPID_AUTO_TEST_SUITE(SequenceSetTestSuite)
+
+using namespace qpid::framing;
+
+BOOST_AUTO_TEST_CASE(testAdd) {
+ SequenceSet s;
+ s.add(2);
+ s.add(8,8);
+ s.add(3,5);
+
+ for (uint32_t i = 0; i <= 1; i++) //0, 1
+ BOOST_CHECK(!s.contains(i));
+
+ for (uint32_t i = 2; i <= 5; i++) //2, 3, 4 & 5
+ BOOST_CHECK(s.contains(i));
+
+ for (uint32_t i = 0; i <= 1; i++) //6, 7
+ BOOST_CHECK(!s.contains(i));
+
+ BOOST_CHECK(s.contains(8));//8
+
+ SequenceSet t;
+ t.add(6, 10);
+ t.add(s);
+
+ for (uint32_t i = 0; i <= 1; i++)
+ BOOST_CHECK(!t.contains(i));
+
+ for (uint32_t i = 2; i <= 10; i++)
+ BOOST_CHECK(t.contains(i));
+}
+
+BOOST_AUTO_TEST_CASE(testRemove) {
+ SequenceSet s;
+ SequenceSet t;
+ s.add(0, 10);
+ t.add(0, 10);
+
+ s.remove(7);
+ s.remove(3, 5);
+ s.remove(9, 10);
+
+ t.remove(s);
+
+ for (uint32_t i = 0; i <= 2; i++) {
+ BOOST_CHECK(s.contains(i));
+ BOOST_CHECK(!t.contains(i));
+ }
+
+ for (uint32_t i = 3; i <= 5; i++) {
+ BOOST_CHECK(!s.contains(i));
+ BOOST_CHECK(t.contains(i));
+ }
+
+ BOOST_CHECK(s.contains(6));
+ BOOST_CHECK(!t.contains(6));
+
+ BOOST_CHECK(!s.contains(7));
+ BOOST_CHECK(t.contains(7));
+
+ BOOST_CHECK(s.contains(8));
+ BOOST_CHECK(!t.contains(8));
+
+ for (uint32_t i = 9; i <= 10; i++) {
+ BOOST_CHECK(!s.contains(i));
+ BOOST_CHECK(t.contains(i));
+ }
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/SequenceSet.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/SequenceSet.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/xml/extra.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/extra.xml?rev=633108&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/extra.xml (added)
+++ incubator/qpid/trunk/qpid/cpp/xml/extra.xml Mon Mar 3 06:49:06 2008
@@ -0,0 +1,585 @@
+<?xml version="1.0"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+
+<amqp major="99" minor="0" port="5672">
+
+ <domain name="mediumstr" type="mediumstr" label="string with 16bit size field" />
+
+ <domain name="sequence-set" type="sequence-set" label="ranged set representation">
+ <doc>
+ Set of pairs of RFC-1982 numbers representing a discontinuous range. Each pair represents a
+ closed interval within the list.
+
+ For example, the set (1,3), (6,6), (8,9) represents the sequence 1,2,3,6,8,9.
+ </doc>
+ </domain>
+
+<class name = "connection010" index = "1">
+
+<method name = "start" index="1">
+ <doc>new start method</doc>
+ <chassis name="client" implement="MUST" />
+
+ <response name="start-ok" />
+
+ <field name="server-properties" domain="table" label="server properties">
+ <doc>blah, blah</doc>
+ </field>
+
+ <field name="mechanisms" domain="longstr" label="available security mechanisms">
+ <doc>blah, blah</doc>
+ </field>
+
+ <field name="locales" domain="longstr" label="available message locales">
+ </field>
+
+</method>
+
+<method name = "start-ok" index="2">
+ <doc>new start-ok method</doc>
+ <chassis name="server" implement="MUST" />
+
+ <field name="client-properties" domain="table" label="server properties">
+ <doc>blah, blah</doc>
+ </field>
+
+ <field name="mechanism" domain="shortstr" label="chosen security mechanism">
+ <doc>blah, blah</doc>
+ </field>
+
+ <field name="response" domain="longstr" label="security response data">
+ <doc>blah blah</doc>
+ </field>
+
+ <field name="locale" domain="shortstr" label="chosen locale">
+ <doc>blah, blah</doc>
+ </field>
+
+</method>
+
+ <method name="secure" synchronous="1" index="3" label="security mechanism challenge">
+ <doc>
+ The SASL protocol works by exchanging challenges and responses until both peers have
+ received sufficient information to authenticate each other. This method challenges the
+ client to provide more information.
+ </doc>
+
+ <chassis name="client" implement="MUST" />
+
+ <response name="secure-ok" />
+
+ <field name="challenge" domain="longstr" label="security challenge data">
+ <doc>
+ Challenge information, a block of opaque binary data passed to the security mechanism.
+ </doc>
+ </field>
+ </method>
+
+ <!-- - Method: connection.secure-ok - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+ <method name="secure-ok" synchronous="1" index="4" label="security mechanism response">
+ <doc>
+ This method attempts to authenticate, passing a block of SASL data for the security
+ mechanism at the server side.
+ </doc>
+
+ <chassis name="server" implement="MUST" />
+
+ <field name="response" domain="longstr" label="security response data">
+ <doc>
+ A block of opaque data passed to the security mechanism. The contents of this data are
+ defined by the SASL security mechanism.
+ </doc>
+ <assert check="notnull" />
+ </field>
+ </method>
+
+ <method name="tune" synchronous="1" index="5" label="propose connection tuning parameters">
+ <doc>
+ This method proposes a set of connection configuration values to the client. The client can
+ accept and/or adjust these.
+ </doc>
+
+ <chassis name="client" implement="MUST" />
+
+ <response name="tune-ok" />
+
+ <field name="channel-max" domain="short" label="proposed maximum channels">
+ <doc>
+ The maximum total number of channels that the server allows per connection. Zero means
+ that the server does not impose a fixed limit, but the number of allowed channels may be
+ limited by available server resources.
+ </doc>
+ </field>
+
+ <field name="frame-max" domain="long" label="proposed maximum frame size">
+ <doc>
+ The largest frame size that the server proposes for the connection. The client can
+ negotiate a lower value. Zero means that the server does not impose any specific limit but
+ may reject very large frames if it cannot allocate resources for them.
+ </doc>
+
+ <rule name="minimum">
+ <doc>
+ Until the frame-max has been negotiated, both peers MUST accept frames of up to
+ frame-min-size octets large, and the minimum negotiated value for frame-max is also
+ frame-min-size.
+ </doc>
+ <doc type="scenario">
+ Client connects to server and sends a large properties field, creating a frame of
+ frame-min-size octets. The server must accept this frame.
+ </doc>
+ </rule>
+ </field>
+
+ <field name="heartbeat" domain="short" label="desired heartbeat delay">
+ <!-- TODO 0.82 - the heartbeat negotiation mechanism was changed during implementation
+ because the model documented here does not actually work properly. The best model we
+ found is that the server proposes a heartbeat value to the client; the client can reply
+ with zero, meaning 'do not use heartbeats (as documented here), or can propose its own
+ heartbeat value, which the server should then accept. This is different from the model
+ here which is disconnected - e.g. each side requests a heartbeat independently. Basically
+ a connection is heartbeated in both ways, or not at all, depending on whether both peers
+ support heartbeating or not, and the heartbeat value should itself be chosen by the client
+ so that remote links can get a higher value. Also, the actual heartbeat mechanism needs
+ documentation, and is as follows: so long as there is activity on a connection - in or out
+ - both peers assume the connection is active. When there is no activity, each peer must
+ send heartbeat frames. When no heartbeat frame is received after N cycles (where N is at
+ least 2), the connection can be considered to have died. /PH 2006/07/19
+ -->
+ <doc>
+ The delay, in seconds, of the connection heartbeat that the server wants. Zero means the
+ server does not want a heartbeat.
+ </doc>
+ </field>
+ </method>
+
+ <!-- - Method: connection.tune-ok - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+ <method name="tune-ok" synchronous="1" index="6"
+ label="negotiate connection tuning parameters">
+ <doc>
+ This method sends the client's connection tuning parameters to the server. Certain fields
+ are negotiated, others provide capability information.
+ </doc>
+
+ <chassis name="server" implement="MUST" />
+
+ <field name="channel-max" domain="short" label="negotiated maximum channels">
+ <doc>
+ The maximum total number of channels that the client will use per connection.
+ </doc>
+
+ <rule name="upper-limit">
+ <doc>
+ If the client specifies a channel max that is higher than the value provided by the
+ server, the server MUST close the connection without attempting a negotiated close. The
+ server may report the error in some fashion to assist implementors.
+ </doc>
+ </rule>
+
+ <assert check="notnull" />
+ <assert check="le" value="channel-max" />
+ </field>
+
+ <field name="frame-max" domain="long" label="negotiated maximum frame size">
+ <doc>
+ The largest frame size that the client and server will use for the connection. Zero means
+ that the client does not impose any specific limit but may reject very large frames if it
+ cannot allocate resources for them. Note that the frame-max limit applies principally to
+ content frames, where large contents can be broken into frames of arbitrary size.
+ </doc>
+
+ <rule name="minimum">
+ <doc>
+ Until the frame-max has been negotiated, both peers MUST accept frames of up to
+ frame-min-size octets large, and the minimum negotiated value for frame-max is also
+ frame-min-size.
+ </doc>
+ </rule>
+
+ <rule name="upper-limit">
+ <doc>
+ If the client specifies a frame max that is higher than the value provided by the
+ server, the server MUST close the connection without attempting a negotiated close. The
+ server may report the error in some fashion to assist implementors.
+ </doc>
+ </rule>
+ </field>
+
+ <field name="heartbeat" domain="short" label="desired heartbeat delay">
+ <doc>
+ The delay, in seconds, of the connection heartbeat that the client wants. Zero means the
+ client does not want a heartbeat.
+ </doc>
+ </field>
+ </method>
+
+ <method name="open" synchronous="1" index="7" label="open connection to virtual host">
+ <doc>
+ This method opens a connection to a virtual host, which is a collection of resources, and
+ acts to separate multiple application domains within a server. The server may apply
+ arbitrary limits per virtual host, such as the number of each type of entity that may be
+ used, per connection and/or in total.
+ </doc>
+
+ <chassis name="server" implement="MUST" />
+
+ <response name="open-ok" />
+ <response name="redirect" />
+
+ <field name="virtual-host" domain="path" label="virtual host name">
+ <!-- TODO 0.82 - the entire vhost model needs review. This concept was prompted by the HTTP
+ vhost concept but does not fit very well into AMQP. Currently we use the vhost as a
+ "cluster identifier" which is inaccurate usage. /PH 2006/07/19
+ -->
+ <doc>
+ The name of the virtual host to work with.
+ </doc>
+
+ <rule name="separation">
+ <doc>
+ If the server supports multiple virtual hosts, it MUST enforce a full separation of
+ exchanges, queues, and all associated entities per virtual host. An application,
+ connected to a specific virtual host, MUST NOT be able to access resources of another
+ virtual host.
+ </doc>
+ </rule>
+
+ <rule name="security">
+ <doc>
+ The server SHOULD verify that the client has permission to access the specified virtual
+ host.
+ </doc>
+ </rule>
+ <assert check="regexp" value="^[a-zA-Z0-9/-_]+$" />
+ </field>
+
+ <field name="capabilities" domain="shortstr" label="required capabilities">
+ <doc>
+ The client can specify zero or more capability names, delimited by spaces. The server can
+ use this string to how to process the client's connection request.
+ </doc>
+ </field>
+
+ <field name="insist" domain="bit" label="insist on connecting to server">
+ <doc>
+ In a configuration with multiple collaborating servers, the server may respond to a
+ Connection.Open method with a Connection.Redirect. The insist option tells the server that
+ the client is insisting on a connection to the specified server.
+ </doc>
+ <rule name="behaviour">
+ <doc>
+ When the client uses the insist option, the server MUST NOT respond with a
+ Connection.Redirect method. If it cannot accept the client's connection request it
+ should respond by closing the connection with a suitable reply code.
+ </doc>
+ </rule>
+ </field>
+ </method>
+
+ <method name="open-ok" synchronous="1" index="8" label="signal that connection is ready">
+ <doc>
+ This method signals to the client that the connection is ready for use.
+ </doc>
+
+ <chassis name="client" implement="MUST" />
+
+ <field name="known-hosts" domain="known-hosts" />
+ </method>
+
+ <method name="redirect" synchronous="1" index="9" label="redirects client to other server">
+ <doc>
+ This method redirects the client to another server, based on the requested virtual host
+ and/or capabilities.
+ </doc>
+
+ <rule name="usage">
+ <doc>
+ When getting the Connection.Redirect method, the client SHOULD reconnect to the host
+ specified, and if that host is not present, to any of the hosts specified in the
+ known-hosts list.
+ </doc>
+ </rule>
+
+ <chassis name="client" implement="MUST" />
+
+ <field name="host" domain="shortstr" label="server to connect to">
+ <doc>
+ Specifies the server to connect to. This is an IP address or a DNS name, optionally
+ followed by a colon and a port number. If no port number is specified, the client should
+ use the default port number for the protocol.
+ </doc>
+ <assert check="notnull" />
+ </field>
+
+ <field name="known-hosts" domain="known-hosts" />
+ </method>
+
+<method name = "heartbeat" index="10">
+ <doc>new start-ok method</doc>
+ <chassis name="server" implement="MUST" />
+</method>
+
+ <!-- - Method: connection.close - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+ <method name="close" synchronous="1" index="11" label="request a connection close">
+ <doc>
+ This method indicates that the sender wants to close the connection. This may be due to
+ internal conditions (e.g. a forced shut-down) or due to an error handling a specific method,
+ i.e. an exception. When a close is due to an exception, the sender provides the class and
+ method id of the method which caused the exception.
+ </doc>
+ <!-- TODO: The connection close mechanism needs to be reviewed from the ODF documentation and
+ better expressed as rules here. /PH 2006/07/20
+ -->
+
+ <rule name="stability">
+ <doc>
+ After sending this method any received method except the Close-OK method MUST be
+ discarded.
+ </doc>
+ </rule>
+
+ <chassis name="client" implement="MUST" />
+ <chassis name="server" implement="MUST" />
+
+ <response name="close-ok" />
+
+ <field name="reply-code" domain="reply-code" />
+ <field name="reply-text" domain="reply-text" />
+
+ <field name="class-id" domain="class-id" label="failing method class">
+ <doc>
+ When the close is provoked by a method exception, this is the class of the method.
+ </doc>
+ </field>
+
+ <field name="method-id" domain="method-id" label="failing method ID">
+ <doc>
+ When the close is provoked by a method exception, this is the ID of the method.
+ </doc>
+ </field>
+ </method>
+
+ <!-- - Method: connection.close-ok - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+ <method name="close-ok" synchronous="1" index="12" label="confirm a connection close">
+ <doc>
+ This method confirms a Connection.Close method and tells the recipient that it is safe to
+ release resources for the connection and close the socket.
+ </doc>
+
+ <rule name="reporting">
+ <doc>
+ A peer that detects a socket closure without having received a Close-Ok handshake method
+ SHOULD log the error.
+ </doc>
+ </rule>
+
+ <chassis name="client" implement="MUST" />
+ <chassis name="server" implement="MUST" />
+ </method>
+
+
+</class>
+
+
+
+<class name = "session010" index = "2">
+
+<method name = "attach" index="1">
+
+ <doc>blah, blah</doc>
+ <chassis name="client" implement="MUST" />
+ <chassis name="server" implement="MUST" />
+
+ <response name="start-ok" />
+
+ <field name="name" domain="mediumstr" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+
+ <field name="force" domain="bit" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+
+</method>
+
+<method name = "attached" index="2">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+
+ <field name="name" domain="mediumstr" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+
+</method>
+
+<method name = "detach" index="3">
+
+ <doc>blah, blah</doc>
+ <chassis name="client" implement="MUST" />
+ <chassis name="server" implement="MUST" />
+
+ <response name="start-ok" />
+
+ <field name="name" domain="mediumstr" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+
+</method>
+
+<method name = "detached" index="4">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+
+ <field name="name" domain="mediumstr" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+
+
+ <field name="detach-code" domain="octet" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+
+</method>
+
+<method name = "request-timeout" index="5">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+
+ <field name="timeout" domain="long" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+</method>
+
+<method name = "timeout" index="6">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+
+ <field name="timeout" domain="long" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+</method>
+
+
+<method name = "command-point" index="7">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+
+ <field name="command-id" domain="rfc1982-long" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+
+
+ <field name="command-offset" domain="longlong" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+</method>
+
+<method name = "expected" index="8">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+
+ <field name="commands" domain="sequence-set" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+
+ <field name="fragments" domain="array" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+</method>
+
+<method name = "confirmed" index="9">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+
+ <field name="commands" domain="sequence-set" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+
+ <field name="fragments" domain="array" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+</method>
+
+<method name = "completed" index="10">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+
+ <field name="commands" domain="sequence-set" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+
+ <field name="timely-reply" domain="bit" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+</method>
+
+<method name = "known-completed" index="11">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+
+ <field name="commands" domain="sequence-set" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+</method>
+
+<method name = "flush" index="12">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+
+ <field name="expected" domain="bit" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+ <field name="confirmed" domain="bit" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+ <field name="completed" domain="bit" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+</method>
+
+<method name = "gap" index="13">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+
+ <field name="commands" domain="sequence-set" label="blah">
+ <doc>blah, blah</doc>
+ </field>
+</method>
+
+</class>
+
+</amqp>
Propchange: incubator/qpid/trunk/qpid/cpp/xml/extra.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/xml/extra.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: incubator/qpid/trunk/qpid/cpp/xml/extra.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml