You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/08/28 21:38:23 UTC

svn commit: r570538 [2/4] - in /incubator/qpid/trunk/qpid: cpp/rubygen/templates/ cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/client/ cpp/src/qpid/cluster/ cpp/src/qpid/framing/ cpp/src/tests/ python/ python/qpid/ python/tests_0-10/ specs/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Tue Aug 28 12:38:17 2007
@@ -20,7 +20,10 @@
  */
 
 #include "SemanticHandler.h"
+
+#include "boost/format.hpp"
 #include "BrokerAdapter.h"
+#include "MessageDelivery.h"
 #include "qpid/framing/ChannelAdapter.h"
 #include "qpid/framing/ChannelCloseOkBody.h"
 #include "qpid/framing/ExecutionCompleteBody.h"
@@ -32,18 +35,16 @@
 using namespace qpid::sys;
 
 SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : 
-    connection(c),
-    channel(c, *this, id, &c.broker.getStore())
+    connection(c), channel(c, *this, id)
 {
     init(id, connection.getOutput(), connection.getVersion());
     adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this));
 }
 
-
 void SemanticHandler::handle(framing::AMQFrame& frame) 
 {    
-    //TODO: assembly etc when move to 0-10 framing
-    //
+    //TODO: assembly for method and headers
+
     //have potentially three separate tracks at this point:
     //
     // (1) execution controls
@@ -51,46 +52,43 @@
     // (3) data i.e. content-bearing commands
     //
     //framesets on each can be interleaved. framesets on the latter
-    //two share a command-id sequence.
+    //two share a command-id sequence. controls on the first track are
+    //used to communicate details about that command-id sequence.
     //
     //need to decide what to do if a frame on the command track
     //arrives while a frameset on the data track is still
     //open. execute it (i.e. out-of order execution with respect to
-    //the command id sequence) or queue it up.
+    //the command id sequence) or queue it up?
 
-    //if ready to execute (i.e. if segment is complete or frame is
-    //message content):
-    handleBody(frame.getBody());
-}
-
-//ChannelAdapter virtual methods:
-void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
-{
-    try {
-        if (!method->invoke(this)) {
-            //temporary hack until channel management is moved to its own handler:
-            if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
-                ++(incoming.lwm);
-            }
+    try{
 
-            //else do the usual:
-            handleL4(method);
-            //(if the frameset is complete) we can move the execution-mark
-            //forward 
-            
-            //temporary hack until channel management is moved to its own handler:
-            if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
-                //TODO: need to account for async store opreations
-                //when this command is a message publication
-                ++(incoming.hwm);                
+        TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header
+        
+        switch(track) {   
+        case SESSION_CONTROL_TRACK://TODO: L2 should be handled by separate handler
+            handleL2(frame.castBody<AMQMethodBody>());
+            break;
+        case EXECUTION_CONTROL_TRACK:
+            handleL3(frame.castBody<AMQMethodBody>());
+            break;
+        case MODEL_COMMAND_TRACK:
+            if (!isOpen()) {
+                throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
             }
-            
-            //note: need to be more sophisticated than this if we execute
-            //commands that arrive within an active message frameset (that
-            //can't happen until 0-10 framing is implemented)
+            handleCommand(frame.castBody<AMQMethodBody>());
+            break;
+        case MODEL_CONTENT_TRACK:
+            handleContent(frame);
+            break;
         }
+        
+    }catch(const ChannelException& e){
+        adapter->getProxy().getChannel().close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
+        connection.closeChannel(getId());
+    }catch(const ConnectionException& e){
+        connection.close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
     }catch(const std::exception& e){
-        connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+        connection.close(541/*internal error*/, e.what(), getClassId(frame), getMethodId(frame));
     }
 }
 
@@ -102,7 +100,6 @@
         outgoing.lwm = mark;
         //ack messages:
         channel.ackCumulative(mark.getValue());
-        //std::cout << "[" << this << "] acknowledged: " << mark << std::endl;
     }
     if (range.size() % 2) { //must be even number        
         throw ConnectionException(530, "Received odd number of elements in ranged mark");
@@ -116,7 +113,6 @@
 void SemanticHandler::flush()
 {
     //flush doubles as a sync to begin with - send an execution.complete
-    incoming.lwm = incoming.hwm;
     if (isOpen()) {
         Mutex::ScopedLock l(outLock);
         ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()));
@@ -142,52 +138,59 @@
     //never actually sent by client at present
 }
 
-void SemanticHandler::handleL4(framing::AMQMethodBody* method)
+void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
 {
-    try{
-        if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
-            if (!method->isA<ChannelCloseOkBody>()) {
-                std::stringstream out;
-                out << "Attempt to use unopened channel: " << getId();
-                throw ConnectionException(504, out.str());
-            }
-        } else {
-            InvocationVisitor v(adapter.get());
-            method->accept(v);
-            if (!v.wasHandled()) {
-                throw ConnectionException(540, "Not implemented");
-            } else if (v.hasResult()) {
-                ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
-            }
-        }
-    }catch(const ChannelException& e){
-        adapter->getProxy().getChannel().close(
-            e.code, e.toString(),
-            method->amqpClassId(), method->amqpMethodId());
-        connection.closeChannel(getId());
-    }catch(const ConnectionException& e){
-        connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+    ++(incoming.lwm);                        
+    InvocationVisitor v(adapter.get());
+    method->accept(v);
+    //TODO: need to account for async store operations and interleaving
+    ++(incoming.hwm);                                    
+    
+    if (!v.wasHandled()) {
+        throw ConnectionException(540, "Not implemented");
+    } else if (v.hasResult()) {
+        ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
     }
 }
 
-bool SemanticHandler::isOpen() const 
-{ 
-    return channel.isOpen(); 
+void SemanticHandler::handleL2(framing::AMQMethodBody* method)
+{
+    if(!method->isA<ChannelOpenBody>() && !isOpen()) {
+        if (!method->isA<ChannelCloseOkBody>()) {
+            throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
+        }
+    } else {
+        method->invoke(adapter->getChannelHandler());
+    }
 }
 
-void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody* body) 
+void SemanticHandler::handleL3(framing::AMQMethodBody* method)
 {
-    channel.handleHeader(body);
+    if (!method->invoke(this)) {
+        throw ConnectionException(540, "Not implemented");
+    }
 }
 
-void SemanticHandler::handleContent(qpid::framing::AMQContentBody* body) 
+void SemanticHandler::handleContent(AMQFrame& frame)
 {
-    channel.handleContent(body);
+    Message::shared_ptr msg(msgBuilder.getMessage());
+    if (!msg) {//start of frameset will be indicated by frame flags
+        msgBuilder.start(++(incoming.lwm));
+        msg = msgBuilder.getMessage();
+    }
+    msgBuilder.handle(frame);
+    if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
+        msg->setPublisher(&connection);
+        channel.handle(msg);
+        msgBuilder.end();
+        //TODO: need to account for async store operations and interleaving
+        ++(incoming.hwm);                
+    }
 }
 
-void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody* body) 
-{
-    channel.handleHeartbeat(body);
+bool SemanticHandler::isOpen() const 
+{ 
+    return channel.isOpen(); 
 }
 
 DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
@@ -195,14 +198,13 @@
     Mutex::ScopedLock l(outLock);
     SequenceNumber copy(outgoing.hwm);
     ++copy;
-    msg->deliver(*this, copy.getValue(), token, connection.getFrameMax());
-    //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl;
+    MessageDelivery::deliver(msg, *this, copy.getValue(), token, connection.getFrameMax());
     return outgoing.hwm.getValue();
 }
 
 void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
 {
-    msg->deliver(*this, tag, token, connection.getFrameMax());
+    MessageDelivery::deliver(msg, *this, tag, token, connection.getFrameMax());
 }
 
 void SemanticHandler::send(const AMQBody& body)
@@ -214,3 +216,49 @@
     }
     ChannelAdapter::send(body);
 }
+
+uint16_t SemanticHandler::getClassId(const AMQFrame& frame)
+{
+    return frame.getBody()->type() == METHOD_BODY ?  frame.castBody<AMQMethodBody>()->amqpClassId() : 0;
+}
+
+uint16_t SemanticHandler::getMethodId(const AMQFrame& frame)
+{
+    return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpMethodId() : 0;
+}
+
+SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
+{
+    //will be replaced by field in 0-10 frame header
+    uint8_t type = frame.getBody()->type();
+    uint16_t classId;
+    switch(type) {
+    case METHOD_BODY:
+        if (frame.castBody<AMQMethodBody>()->isContentBearing()) {
+            return MODEL_CONTENT_TRACK;
+        }
+
+        classId = frame.castBody<AMQMethodBody>()->amqpClassId();
+        switch (classId) {
+        case ChannelOpenBody::CLASS_ID:
+            return SESSION_CONTROL_TRACK;
+        case ExecutionCompleteBody::CLASS_ID:
+            return EXECUTION_CONTROL_TRACK;
+        }
+
+        return MODEL_COMMAND_TRACK;
+    case HEADER_BODY:
+    case CONTENT_BODY:
+        return MODEL_CONTENT_TRACK;
+    }
+    throw Exception("Could not determine track");
+}
+
+//ChannelAdapter virtual methods, no longer used:
+void SemanticHandler::handleMethod(framing::AMQMethodBody*){}
+
+void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody*) {}
+
+void SemanticHandler::handleContent(qpid::framing::AMQContentBody*) {}
+
+void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody*) {}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Tue Aug 28 12:38:17 2007
@@ -25,6 +25,7 @@
 #include "BrokerChannel.h"
 #include "Connection.h"
 #include "DeliveryAdapter.h"
+#include "MessageBuilder.h"
 #include "qpid/framing/amqp_types.h"
 #include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/framing/FrameHandler.h"
@@ -55,8 +56,17 @@
     framing::Window incoming;
     framing::Window outgoing;
     sys::Mutex outLock;
+    MessageBuilder msgBuilder;
 
-    void handleL4(framing::AMQMethodBody* method);
+    enum TrackId {SESSION_CONTROL_TRACK, EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK};
+    TrackId getTrack(const framing::AMQFrame& frame);
+    uint16_t getClassId(const framing::AMQFrame& frame);
+    uint16_t getMethodId(const framing::AMQFrame& frame);
+
+    void handleL3(framing::AMQMethodBody* method);
+    void handleL2(framing::AMQMethodBody* method);
+    void handleCommand(framing::AMQMethodBody* method);
+    void handleContent(framing::AMQFrame& frame);
 
     //ChannelAdapter virtual methods:
     void handleMethod(framing::AMQMethodBody* method);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Tue Aug 28 12:38:17 2007
@@ -25,7 +25,6 @@
 #include <vector>
 #include "BrokerExchange.h"
 #include "qpid/framing/FieldTable.h"
-#include "BrokerMessage.h"
 #include "qpid/sys/Monitor.h"
 #include "BrokerQueue.h"
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Tue Aug 28 12:38:17 2007
@@ -24,10 +24,10 @@
 #include <algorithm>
 #include <functional>
 #include <list>
+#include "BrokerQueue.h"
 #include "Deliverable.h"
-#include "BrokerMessage.h"
+#include "Message.h"
 #include "MessageStore.h"
-#include "BrokerQueue.h"
 #include "TxOp.h"
 
 namespace qpid {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp Tue Aug 28 12:38:17 2007
@@ -75,7 +75,7 @@
     id = _id;
 
     setState(OPENING);
-    AMQFrame f(version, id, ChannelOpenBody(version));
+    AMQFrame f(id, ChannelOpenBody(version));
     out(f);
 
     std::set<int> states;
@@ -90,7 +90,7 @@
 void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
 {
     setState(CLOSING);
-    AMQFrame f(version, id, ChannelCloseBody(version, code, message, classId, methodId));
+    AMQFrame f(id, ChannelCloseBody(version, code, message, classId, methodId));
     out(f);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Tue Aug 28 12:38:17 2007
@@ -181,8 +181,8 @@
     if (response.isA<BasicGetEmptyBody>()) {
         return false;
     } else {
-        ReceivedContent::shared_ptr content = gets.pop();
-        content->populate(msg);
+        FrameSet::shared_ptr content = gets.pop();
+        msg.populate(*content);
         return true;
     }
 }
@@ -232,13 +232,13 @@
 void Channel::run() {
     try {
         while (true) {
-            ReceivedContent::shared_ptr content = session->get();
+            FrameSet::shared_ptr content = session->get();
             //need to dispatch this to the relevant listener:
             if (content->isA<BasicDeliverBody>()) {
                 ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
                 if (i != consumers.end()) {
                     Message msg;
-                    content->populate(msg);
+                    msg.populate(*content);
                     i->second.listener->received(msg);
                 } else {
                     QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod());                        

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Tue Aug 28 12:38:17 2007
@@ -83,7 +83,7 @@
     std::auto_ptr<Session> session;
     SessionCore::shared_ptr sessionCore;
     framing::ChannelId channelId;
-    BlockingQueue<ReceivedContent::shared_ptr> gets;
+    BlockingQueue<framing::FrameSet::shared_ptr> gets;
     framing::Uuid uniqueId;
     uint32_t nameCounter;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h Tue Aug 28 12:38:17 2007
@@ -23,8 +23,13 @@
  */
 #include <string>
 #include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/FrameSet.h"
 #include "qpid/framing/MethodContent.h"
 
+#include "qpid/framing/BasicDeliverBody.h"
+#include "qpid/framing/BasicGetOkBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+
 namespace qpid {
 namespace client {
 
@@ -54,6 +59,17 @@
     void setRedelivered(bool _redelivered){  redelivered = _redelivered; }
 
     const HeaderProperties& getMethodHeaders() const { return *this; }
+
+
+    //TODO: move this elsewhere (GRS 24/08/2007)
+    void populate(framing::FrameSet& frameset)
+    {
+        const BasicHeaderProperties* properties = frameset.getHeaders()->get<BasicHeaderProperties>();
+        if (properties) {
+            BasicHeaderProperties::copy<Message, BasicHeaderProperties>(*this, *properties);
+        }
+        frameset.getContent(data);
+    }
 
   private:
     std::string data;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Tue Aug 28 12:38:17 2007
@@ -109,7 +109,7 @@
 
 void ConnectionHandler::send(const framing::AMQBody& body)
 {
-    AMQFrame f(ProtocolVersion(), 0, body);
+    AMQFrame f(0, body);
     out(f);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Tue Aug 28 12:38:17 2007
@@ -107,7 +107,7 @@
 
 void ConnectionImpl::idleOut()
 {
-    AMQFrame frame(version, 0, new AMQHeartbeatBody());
+    AMQFrame frame(0, new AMQHeartbeatBody());
     connector->send(frame);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Tue Aug 28 12:38:17 2007
@@ -180,7 +180,7 @@
 		inbuf.move(received);
 		inbuf.flip();//position = 0, limit = total data read
 		
-		AMQFrame frame(version);
+		AMQFrame frame;
 		while(frame.decode(inbuf)){
                     QPID_LOG(trace, "RECV: " << frame);
 		    input->received(frame);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Tue Aug 28 12:38:17 2007
@@ -64,9 +64,9 @@
     if (!invoke(body, this)) {
         if (isContentFrame(frame)) {
             if (!arriving) {
-                arriving = ReceivedContent::shared_ptr(new ReceivedContent(++incoming.hwm));
+                arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
             }
-            arriving->append(body);
+            arriving->append(frame);
             if (arriving->isComplete()) {
                 received.push(arriving);
                 arriving.reset();
@@ -123,7 +123,7 @@
 
 void ExecutionHandler::sendFlush()
 {
-    AMQFrame frame(version, 0, ExecutionFlushBody());
+    AMQFrame frame(0, ExecutionFlushBody());
     out(frame);        
 }
 
@@ -139,8 +139,7 @@
         correlation.listen(g);
     }
 
-    AMQFrame frame(version, 0/*id will be filled in be channel handler*/,
-                   command);
+    AMQFrame frame(0/*id will be filled in be channel handler*/, command);
     out(frame);
 }
 
@@ -149,10 +148,10 @@
 {
     send(command, f, g);
 
-    AMQHeaderBody header(BASIC);
-    BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header.getProperties()), headers);
-    header.setContentSize(data.size());
-    AMQFrame h(version, 0, header);
+    AMQHeaderBody header;
+    BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers);
+    header.get<BasicHeaderProperties>(true)->setContentLength(data.size());
+    AMQFrame h(0, header);
     out(h);
 
     u_int64_t data_length = data.length();
@@ -160,7 +159,7 @@
         //frame itself uses 8 bytes
         u_int32_t frag_size = maxFrameSize - 8;
         if(data_length < frag_size){
-            AMQFrame frame(version, 0, AMQContentBody(data));
+            AMQFrame frame(0, AMQContentBody(data));
             out(frame);
         }else{
             u_int32_t offset = 0;
@@ -168,7 +167,7 @@
             while (remaining > 0) {
                 u_int32_t length = remaining > frag_size ? frag_size : remaining;
                 string frag(data.substr(offset, length));
-                AMQFrame frame(version, 0, AMQContentBody(frag));
+                AMQFrame frame(0, AMQContentBody(frag));
                 out(frame);
                 offset += length;
                 remaining = data_length - offset;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Tue Aug 28 12:38:17 2007
@@ -23,12 +23,12 @@
 
 #include <queue>
 #include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/FrameSet.h"
 #include "qpid/framing/SequenceNumber.h"
 #include "BlockingQueue.h"
 #include "ChainableFrameHandler.h"
 #include "CompletionTracker.h"
 #include "Correlator.h"
-#include "ReceivedContent.h"
 
 namespace qpid {
 namespace client {
@@ -39,7 +39,7 @@
 {
     framing::Window incoming;
     framing::Window outgoing;
-    ReceivedContent::shared_ptr arriving;
+    framing::FrameSet::shared_ptr arriving;
     Correlator correlation;
     CompletionTracker completion;
     framing::ProtocolVersion version;
@@ -52,7 +52,7 @@
     void sync();
 
 public:
-    BlockingQueue<ReceivedContent::shared_ptr> received; 
+    BlockingQueue<framing::FrameSet::shared_ptr> received; 
 
     ExecutionHandler(uint64_t maxFrameSize = 65536);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Tue Aug 28 12:38:17 2007
@@ -77,7 +77,7 @@
     return Response(f);
 }
 
-ReceivedContent::shared_ptr SessionCore::get()
+FrameSet::shared_ptr SessionCore::get()
 {
     return l3.received.pop();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Tue Aug 28 12:38:17 2007
@@ -25,11 +25,11 @@
 #include <boost/shared_ptr.hpp>
 #include "qpid/framing/AMQMethodBody.h"
 #include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/FrameSet.h"
 #include "qpid/framing/MethodContent.h"
 #include "ChannelHandler.h"
 #include "ExecutionHandler.h"
 #include "FutureFactory.h"
-#include "ReceivedContent.h"
 #include "Response.h"
 
 namespace qpid {
@@ -49,7 +49,7 @@
     SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
     Response send(const framing::AMQMethodBody& method, bool expectResponse = false);
     Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false);
-    ReceivedContent::shared_ptr get();
+    framing::FrameSet::shared_ptr get();
     uint16_t getId() const { return id; } 
     void setSync(bool);
     bool isSync();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Aug 28 12:38:17 2007
@@ -94,8 +94,7 @@
 }
 
 void Cluster::notify() {
-    AMQFrame frame(ProtocolVersion(), 0,
-                   ClusterNotifyBody(ProtocolVersion(), url));
+    AMQFrame frame(0, ClusterNotifyBody(ProtocolVersion(), url));
     handle(frame);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp Tue Aug 28 12:38:17 2007
@@ -51,7 +51,7 @@
     // 
     BrokerHandler(Broker& broker) :
         connection(0, broker),
-        channel(connection, *this, 1, 0),
+        channel(connection, *this, 1),
         adapter(channel, connection, broker, *this) {}
 
     void handle(AMQFrame& frame) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.h Tue Aug 28 12:38:17 2007
@@ -38,6 +38,7 @@
     inline virtual ~AMQContentBody(){}
     inline uint8_t type() const { return CONTENT_BODY; };
     inline const string& getData() const { return data; }
+    inline string& getData() { return data; }
     uint32_t size() const;
     void encode(Buffer& buffer) const;
     void decode(Buffer& buffer, uint32_t size);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQDataBlock.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQDataBlock.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQDataBlock.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQDataBlock.h Tue Aug 28 12:38:17 2007
@@ -30,7 +30,7 @@
 {
 public:
     virtual ~AMQDataBlock() {}
-    virtual void encode(Buffer& buffer) = 0; 
+    virtual void encode(Buffer& buffer) const = 0; 
     virtual bool decode(Buffer& buffer) = 0; 
     virtual uint32_t size() const = 0;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Tue Aug 28 12:38:17 2007
@@ -70,7 +70,7 @@
     return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body));
 }
 
-void AMQFrame::encode(Buffer& buffer)
+void AMQFrame::encode(Buffer& buffer) const
 {
     buffer.putOctet(getBody()->type());
     buffer.putShort(channel);    
@@ -80,8 +80,11 @@
 }
 
 uint32_t AMQFrame::size() const{
-    return 1/*type*/ + 2/*channel*/ + 4/*body size*/ +
-        boost::apply_visitor(SizeVisitor(), body) + 1/*0xCE*/;
+    return frameOverhead() + boost::apply_visitor(SizeVisitor(), body);
+}
+
+uint32_t AMQFrame::frameOverhead() {
+    return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + 1/*0xCE*/;
 }
 
 bool AMQFrame::decode(Buffer& buffer)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Tue Aug 28 12:38:17 2007
@@ -37,14 +37,14 @@
 class AMQFrame : public AMQDataBlock
 {
   public:
-    AMQFrame(ProtocolVersion=ProtocolVersion()) {}
+    AMQFrame() : channel(0) {}
 
     /** Construct a frame with a copy of b */
-    AMQFrame(ProtocolVersion, ChannelId c, const AMQBody* b) : channel(c) {
+    AMQFrame(ChannelId c, const AMQBody* b) : channel(c) {
         setBody(*b);
     }
     
-    AMQFrame(ProtocolVersion, ChannelId c, const AMQBody& b) : channel(c) {
+    AMQFrame(ChannelId c, const AMQBody& b) : channel(c) {
         setBody(b);
     }
     
@@ -52,21 +52,26 @@
     void setChannel(ChannelId c) { channel = c; }
 
     AMQBody* getBody();
-    const AMQBody* getBody() const;
+    const AMQBody* getBody() const;    
 
     /** Copy a body instance to the frame */
     void setBody(const AMQBody& b) { CopyVisitor cv(*this); b.accept(cv); }
 
     /** Convenience template to cast the body to an expected type. */
     template <class T> T* castBody() {
-        boost::polymorphic_downcast<T*>(getBody());
+        return boost::polymorphic_downcast<T*>(getBody());
+    }
+
+    template <class T> const T* castBody() const {
+        return boost::polymorphic_downcast<const T*>(getBody());
     }
 
     bool empty() { return boost::get<boost::blank>(&body); }
 
-    void encode(Buffer& buffer); 
+    void encode(Buffer& buffer) const; 
     bool decode(Buffer& buffer); 
     uint32_t size() const;
+    static uint32_t frameOverhead();
 
   private:
     struct CopyVisitor : public AMQBodyConstVisitor {
@@ -77,7 +82,7 @@
         void visit(const AMQHeartbeatBody& x) { frame.body=x; }
         void visit(const AMQMethodBody& x) { frame.body=MethodHolder(x); }
     };
-  friend struct CopyVisitor;
+    friend struct CopyVisitor;
 
     typedef boost::variant<boost::blank,
                            AMQHeaderBody,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp Tue Aug 28 12:38:17 2007
@@ -19,37 +19,65 @@
  *
  */
 #include "AMQHeaderBody.h"
-#include "qpid/QpidError.h"
-#include "BasicHeaderProperties.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
 
-qpid::framing::AMQHeaderBody::AMQHeaderBody(int) : weight(0), contentSize(0) {}
+qpid::framing::AMQHeaderBody::AMQHeaderBody() {}
 
-qpid::framing::AMQHeaderBody::AMQHeaderBody() : weight(0), contentSize(0){}
-
-qpid::framing::AMQHeaderBody::~AMQHeaderBody(){}
+qpid::framing::AMQHeaderBody::~AMQHeaderBody() {}
 
 uint32_t qpid::framing::AMQHeaderBody::size() const{
-    return 12 + properties.size();
+    CalculateSize visitor;
+    for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor));
+    return visitor.totalSize() + (properties.size() * (2/*type codes*/ + 4/*size*/));
 }
 
 void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const {
-    buffer.putShort(properties.classId());
-    buffer.putShort(weight);
-    buffer.putLongLong(contentSize);
-    properties.encode(buffer);
+    Encode visitor(buffer);
+    for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor));
+}
+
+void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t size){
+    uint32_t limit = buffer.available() - size;
+    while (buffer.available() > limit + 2) {
+        uint32_t len = buffer.getLong();
+        uint16_t type = buffer.getShort();
+        //The following switch could be generated as the number of options increases:
+        switch(type) {
+        case BasicHeaderProperties::TYPE: 
+            decode(BasicHeaderProperties(), buffer, len - 2);
+            break;
+        case MessageProperties::TYPE:
+            decode(MessageProperties(), buffer, len - 2);
+            break;
+        case DeliveryProperties::TYPE:
+            decode(DeliveryProperties(), buffer, len - 2);
+            break;
+        default:
+            //TODO: should just skip over them keeping them for later dispatch as is
+            throw Exception(QPID_MSG("Unexpected property type: " << type));
+        }
+    }
 }
 
-void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t bufSize){
-    buffer.getShort();          // Ignore classId
-    weight = buffer.getShort();
-    contentSize = buffer.getLongLong();
-    properties.decode(buffer, bufSize - 12);
+uint64_t qpid::framing::AMQHeaderBody::getContentLength() const
+{    
+    const MessageProperties* mProps = get<MessageProperties>();
+    if (mProps) {
+        return mProps->getContentLength();
+    }
+    const BasicHeaderProperties* bProps = get<BasicHeaderProperties>();
+    if (bProps) {
+        return bProps->getContentLength();
+    }
+    return 0;
 }
 
 void qpid::framing::AMQHeaderBody::print(std::ostream& out) const
 {
-    out << "header (" << size() << " bytes)"  << " content_size=" << getContentSize();
-    out << ", message_id=" << properties.getMessageId(); 
-    out << ", delivery_mode=" << (int) properties.getDeliveryMode(); 
-    out << ", headers=" << properties.getHeaders();
+    out << "header (" << size() << " bytes)";
+    out << "; properties={";
+    Print visitor(out);
+    for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor));
+    out << "}";
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h Tue Aug 28 12:38:17 2007
@@ -22,6 +22,12 @@
 #include "AMQBody.h"
 #include "Buffer.h"
 #include "BasicHeaderProperties.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/MessageProperties.h"
+#include <iostream>
+#include <vector>
+#include <boost/variant.hpp>
+#include <boost/variant/get.hpp>
 
 #ifndef _AMQHeaderBody_
 #define _AMQHeaderBody_
@@ -31,24 +37,85 @@
 
 class AMQHeaderBody :  public AMQBody
 {
-    BasicHeaderProperties properties;
-    uint16_t weight;
-    uint64_t contentSize;
-  public:
-    AMQHeaderBody(int classId);
+    typedef std::vector< boost::variant<BasicHeaderProperties, DeliveryProperties, MessageProperties> > PropertyList; 
+
+    PropertyList properties;
+
+    template <class T> void decode(T t, Buffer& b, uint32_t size) {
+        t.decode(b, size);
+        properties.push_back(t);
+    }
+
+    class Encode : public boost::static_visitor<> {
+        Buffer& buffer;
+    public:
+        Encode(Buffer& b) : buffer(b) {}
+
+        template <class T> void operator()(T& t) const {
+            buffer.putLong(t.size() + 2/*typecode*/);
+            buffer.putShort(T::TYPE);
+            t.encode(buffer);
+        }
+    };
+
+    class CalculateSize : public boost::static_visitor<> {
+        uint32_t size;
+    public:
+        CalculateSize() : size(0) {}
+
+        template <class T> void operator()(T& t) {
+            size += t.size();
+        }
+
+        uint32_t totalSize() { 
+            return size; 
+        }        
+    };
+
+    class Print : public boost::static_visitor<> {
+        std::ostream& out;
+    public:
+        Print(std::ostream& o) : out(o) {}
+
+        template <class T> void operator()(T& t) {
+            out << t;
+        }
+    };
+
+public:
+
     AMQHeaderBody();
+    ~AMQHeaderBody();
     inline uint8_t type() const { return HEADER_BODY; }
-    BasicHeaderProperties* getProperties(){ return &properties; }
-    const BasicHeaderProperties* getProperties() const { return &properties; }
-    inline uint64_t getContentSize() const { return contentSize; }
-    inline void setContentSize(uint64_t _size) { contentSize = _size; }
-    virtual ~AMQHeaderBody();
-    virtual uint32_t size() const;
-    virtual void encode(Buffer& buffer) const;
-    virtual void decode(Buffer& buffer, uint32_t size);
-    virtual void print(std::ostream& out) const;
+
+    uint32_t size() const;
+    void encode(Buffer& buffer) const;
+    void decode(Buffer& buffer, uint32_t size);
+    uint64_t getContentLength() const;
+    void print(std::ostream& out) const;
 
     void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
+
+    template <class T> T* get(bool create) { 
+        for (PropertyList::iterator i = properties.begin(); i != properties.end(); i++) {
+            T* p = boost::get<T>(&(*i));
+            if (p) return p;
+        }
+        if (create) {
+            properties.push_back(T());
+            return boost::get<T>(&(properties.back()));
+        } else {
+            return 0;
+        }
+    }
+
+    template <class T> const T* get() const { 
+        for (PropertyList::const_iterator i = properties.begin(); i != properties.end(); i++) {
+            const T* p = boost::get<T>(&(*i));
+            if (p) return p;
+        }
+        return 0;
+    }
 };
 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h Tue Aug 28 12:38:17 2007
@@ -49,6 +49,7 @@
     
     virtual MethodId amqpMethodId() const = 0;
     virtual ClassId  amqpClassId() const = 0;
+    virtual bool isContentBearing() const = 0;
     
     void invoke(AMQP_ServerOperations&);
     bool invoke(Invocable*);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.cpp Tue Aug 28 12:38:17 2007
@@ -22,7 +22,10 @@
 
 //TODO: This could be easily generated from the spec
 
-qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)), priority(0), timestamp(0){}
+qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)), 
+                                                                priority(0), 
+                                                                timestamp(0), 
+                                                                contentLength(0){}
 qpid::framing::BasicHeaderProperties::~BasicHeaderProperties(){}
 
 uint32_t qpid::framing::BasicHeaderProperties::size() const{
@@ -41,6 +44,7 @@
     if(userId.length() > 0) bytes += userId.length() + 1;
     if(appId.length() > 0) bytes += appId.length() + 1;
     if(clusterId.length() > 0) bytes += clusterId.length() + 1;
+    if(contentLength != 0) bytes += 8;
 
     return bytes;
 }
@@ -63,6 +67,7 @@
     if(userId.length() > 0) buffer.putShortString(userId);
     if(appId.length() > 0) buffer.putShortString(appId);
     if(clusterId.length() > 0) buffer.putShortString(clusterId);    
+    if(contentLength != 0) buffer.putLongLong(contentLength);
 }
 
 void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, uint32_t /*size*/){
@@ -81,6 +86,7 @@
     if(flags & (1 <<  4)) buffer.getShortString(userId);
     if(flags & (1 <<  3)) buffer.getShortString(appId);
     if(flags & (1 <<  2)) buffer.getShortString(clusterId);    
+    if(flags & (1 <<  1)) contentLength = buffer.getLongLong();    
 }
 
 uint16_t qpid::framing::BasicHeaderProperties::getFlags() const{
@@ -99,5 +105,32 @@
     if(userId.length() > 0)          flags |= (1 <<  4);
     if(appId.length() > 0)           flags |= (1 <<  3);
     if(clusterId.length() > 0)       flags |= (1 <<  2);
+    if(contentLength != 0)           flags |= (1 <<  1);
     return flags;
 }
+
+namespace qpid{
+namespace framing{
+
+    std::ostream& operator<<(std::ostream& out, const BasicHeaderProperties& props) 
+    {
+        if(props.contentType.length() > 0) out << "contentType=" << props.contentType << ";";
+        if(props.contentEncoding.length() > 0) out << "contentEncoding=" << props.contentEncoding << ";";
+        if(props.headers.count() > 0) out << "headers=" << props.headers << ";";
+        if(props.deliveryMode != 0) out << "deliveryMode=" << props.deliveryMode << ";";
+        if(props.priority != 0) out << "priority=" << props.priority << ";";
+        if(props.correlationId.length() > 0) out << "correlationId=" << props.correlationId << ";";
+        if(props.replyTo.length() > 0) out << "replyTo=" << props.replyTo << ";";
+        if(props.expiration.length() > 0) out << "expiration=" << props.expiration << ";";
+        if(props.messageId.length() > 0) out << "messageId=" << props.messageId << ";";
+        if(props.timestamp != 0) out << "timestamp=" << props.timestamp << ";";
+        if(props.type.length() > 0) out << "type=" << props.type << ";";
+        if(props.userId.length() > 0) out << "userId=" << props.userId << ";";
+        if(props.appId.length() > 0) out << "appId=" << props.appId << ";";
+        if(props.clusterId.length() > 0) out << "clusterId=" << props.clusterId << ";";    
+        if(props.contentLength != 0) out << "contentLength=" << props.contentLength << ";";
+
+        return out;
+    }
+
+}}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h Tue Aug 28 12:38:17 2007
@@ -47,15 +47,18 @@
     string userId;
     string appId;
     string clusterId;
+    uint64_t contentLength;
 	
     uint16_t getFlags() const;
 
   public:
+    static const uint16_t TYPE = BASIC;
+
     BasicHeaderProperties();
     virtual ~BasicHeaderProperties();
     virtual uint32_t size() const;
     virtual void encode(Buffer& buffer) const;
-    virtual void decode(Buffer& buffer, uint32_t size);
+    virtual void decode(Buffer& buffer, uint32_t size = 0);
 
     virtual uint8_t classId() const { return BASIC; }
 
@@ -74,6 +77,7 @@
     string getUserId() const { return userId; }
     string getAppId() const { return appId; }
     string getClusterId() const { return clusterId; }
+    uint64_t getContentLength() const { return contentLength; }
 
     void setContentType(const string& _type){ contentType = _type; }
     void setContentEncoding(const string& encoding){ contentEncoding = encoding; }
@@ -89,6 +93,9 @@
     void setUserId(const string& _userId){ userId = _userId; }
     void setAppId(const string& _appId){appId = _appId; }
     void setClusterId(const string& _clusterId){ clusterId = _clusterId; }
+    void setContentLength(uint64_t _contentLength){ contentLength = _contentLength; }
+
+    friend std::ostream& operator<<(std::ostream&, const BasicHeaderProperties&);
 
     /** \internal
      * Template to copy between types like BasicHeaderProperties.
@@ -109,6 +116,7 @@
         to.setUserId(from.getUserId());
         to.setAppId(from.getAppId());
         to.setClusterId(from.getClusterId());
+        to.setContentLength(from.getContentLength());
     }
 };
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp Tue Aug 28 12:38:17 2007
@@ -51,7 +51,7 @@
 void ChannelAdapter::send(const AMQBody& body)
 {
     assertChannelOpen();
-    AMQFrame frame(getVersion(), getId(), body);
+    AMQFrame frame(getId(), body);
     handlers.out->handle(frame);
 }
 

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp (from r569281, incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp&r1=569281&r2=570538&rev=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp Tue Aug 28 12:38:17 2007
@@ -19,87 +19,65 @@
  *
  */
 
-#include "ReceivedContent.h"
+#include "FrameSet.h"
 #include "qpid/framing/all_method_bodies.h"
+#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/TypeFilter.h"
 
-using qpid::client::ReceivedContent;
 using namespace qpid::framing;
 using namespace boost;
 
-ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {}
+FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {}
 
-void ReceivedContent::append(AMQBody* part)
+void FrameSet::append(AMQFrame& part)
 {
-    parts.push_back(AMQFrame(ProtocolVersion(), 0, part));
+    parts.push_back(part);
 }
 
-bool ReceivedContent::isComplete() const
+bool FrameSet::isComplete() const
 {
-    if (parts.empty()) {
-         return false;
-    } else if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
-        const AMQHeaderBody* headers(getHeaders());
-        return headers && headers->getContentSize() == getContentSize();
-    } else if (isA<MessageTransferBody>()) {
-        //no longer support references, headers and data are still method fields
-        return true;
+    //TODO: should eventually use the 0-10 frame header flags when available
+    const AMQMethodBody* method = getMethod();
+    if (!method) {
+        return false;
+    } else if (method->isContentBearing()) {
+        const AMQHeaderBody* header = getHeaders();
+        if (header) {
+            return header->getContentLength() == getContentSize();
+        } else {
+            return false;
+        }
     } else {
-        throw Exception("Unknown content class");
+        return true;        
     }
 }
 
-
-const AMQMethodBody* ReceivedContent::getMethod() const
+const AMQMethodBody* FrameSet::getMethod() const
 {
     return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody());
 }
 
-const AMQHeaderBody* ReceivedContent::getHeaders() const
+const AMQHeaderBody* FrameSet::getHeaders() const
 {
     return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody());
 }
 
-uint64_t ReceivedContent::getContentSize() const
+AMQHeaderBody* FrameSet::getHeaders()
 {
-    if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
-        uint64_t size(0);
-        for (uint i = 2; i < parts.size(); i++) {
-            size += parts[i].getBody()->size();
-        }
-        return size;
-    } else if (isA<MessageTransferBody>()) {
-        return as<MessageTransferBody>()->getBody().getValue().size();
-    } else {
-        throw Exception("Unknown content class");
-    }    
+    return parts.size() < 2 ? 0 : dynamic_cast<AMQHeaderBody*>(parts[1].getBody());
 }
 
-std::string ReceivedContent::getContent() const
+uint64_t FrameSet::getContentSize() const
 {
-    if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
-        string data;
-        for (uint i = 2; i < parts.size(); i++) {
-            data += static_cast<const AMQContentBody*>(parts[i].getBody())->getData();
-        }
-        return data;
-    } else if (isA<MessageTransferBody>()) {
-        return as<MessageTransferBody>()->getBody().getValue();
-    } else {
-        throw Exception("Unknown content class");
-    }
+    SumBodySize sum;
+    map_if(sum, TypeFilter(CONTENT_BODY));
+    return sum.getSize();
 }
 
-void ReceivedContent::populate(Message& msg)
+void FrameSet::getContent(std::string& out) const
 {
-    if (!isComplete()) throw Exception("Incomplete message");
-
-    if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
-        const BasicHeaderProperties* properties = dynamic_cast<const BasicHeaderProperties*>(getHeaders()->getProperties());
-        BasicHeaderProperties::copy<Message, BasicHeaderProperties>(msg, *properties);
-        msg.setData(getContent());
-    } else if (isA<MessageTransferBody>()) {
-        throw Exception("Transfer not yet supported");
-    } else {
-        throw Exception("Unknown content class");
-    }    
+    AccumulateContent accumulator(out);
+    map_if(accumulator, TypeFilter(CONTENT_BODY));
 }

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h (from r569281, incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h&r1=569281&r2=570538&rev=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h Tue Aug 28 12:38:17 2007
@@ -23,49 +23,76 @@
 #include "qpid/framing/amqp_framing.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/SequenceNumber.h"
-#include "ClientMessage.h"
 
-#ifndef _ReceivedContent_
-#define _ReceivedContent_
+#ifndef _FrameSet_
+#define _FrameSet_
 
 namespace qpid {
-namespace client {
+namespace framing {
 
 /**
- * Collects the frames representing some received 'content'. This
- * provides a raw interface to 'message' data and attributes.
+ * Collects the frames representing a message.
  */
-class ReceivedContent
+class FrameSet
 {
-    const framing::SequenceNumber id;
-    std::vector<framing::AMQFrame> parts;
+    typedef std::vector<AMQFrame> Frames;
+    const SequenceNumber id;
+    Frames parts;
 
 public:
-    typedef boost::shared_ptr<ReceivedContent> shared_ptr;
+    typedef boost::shared_ptr<FrameSet> shared_ptr;
 
-    ReceivedContent(const framing::SequenceNumber& id);
-    void append(framing::AMQBody* part);
+    FrameSet(const SequenceNumber& id);
+    void append(AMQFrame& part);
     bool isComplete() const;
 
     uint64_t getContentSize() const;
-    std::string getContent() const;
+    void getContent(std::string&) const;
 
-    const framing::AMQMethodBody* getMethod() const;
-    const framing::AMQHeaderBody* getHeaders() const;
+    const AMQMethodBody* getMethod() const;
+    const AMQHeaderBody* getHeaders() const;
+    AMQHeaderBody* getHeaders();
      
     template <class T> bool isA() const {
-        const framing::AMQMethodBody* method=getMethod();
+        const AMQMethodBody* method = getMethod();
         return method && method->isA<T>();
     }
 
     template <class T> const T* as() const {
-        const framing::AMQMethodBody* method=getMethod();
+        const AMQMethodBody* method = getMethod();
         return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0;
     }    
 
-    const framing::SequenceNumber& getId() const { return id; }
+    template <class T> const T* getHeaderProperties() const {
+        const AMQHeaderBody* header = getHeaders();
+        return header ? header->get<T>() : 0;
+    }
+
+    const SequenceNumber& getId() const { return id; }
+
+    template <class P> void remove(P predicate) {
+        parts.erase(remove_if(parts.begin(), parts.end(), predicate), parts.end());
+    }
+
+    template <class F> void map(F& functor) {
+        for_each(parts.begin(), parts.end(), functor);
+    }
 
-    void populate(Message& msg);
+    template <class F> void map(F& functor) const {
+        for_each(parts.begin(), parts.end(), functor);
+    }
+
+    template <class F, class P> void map_if(F& functor, P predicate) {
+        for(Frames::iterator i = parts.begin(); i != parts.end(); i++) {
+            if (predicate(*i)) functor(*i);
+        }
+    }
+
+    template <class F, class P> void map_if(F& functor, P predicate) const {
+        for(Frames::const_iterator i = parts.begin(); i != parts.end(); i++) {
+            if (predicate(*i)) functor(*i);
+        }
+    }
 };
 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp Tue Aug 28 12:38:17 2007
@@ -31,7 +31,7 @@
 
 ProtocolInitiation::~ProtocolInitiation(){}
 
-void ProtocolInitiation::encode(Buffer& buffer){
+void ProtocolInitiation::encode(Buffer& buffer) const {
     buffer.putOctet('A');
     buffer.putOctet('M');
     buffer.putOctet('Q');

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h Tue Aug 28 12:38:17 2007
@@ -39,7 +39,7 @@
     ProtocolInitiation(uint8_t major, uint8_t minor);
     ProtocolInitiation(ProtocolVersion p);
     virtual ~ProtocolInitiation();
-    virtual void encode(Buffer& buffer); 
+    virtual void encode(Buffer& buffer) const; 
     virtual bool decode(Buffer& buffer); 
     inline virtual uint32_t size() const { return 8; }
     inline uint8_t getMajor() const { return version.getMajor(); }

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp?rev=570538&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp Tue Aug 28 12:38:17 2007
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SendContent.h"
+
+qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs) : handler(h), channel(c), maxFrameSize(mfs) {}
+
+void qpid::framing::SendContent::operator()(AMQFrame& f) const
+{
+    uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
+    const AMQContentBody* body(f.castBody<AMQContentBody>()); 
+    if (body->size() > maxContentSize) {
+        uint32_t offset = 0;
+        for (int chunk = body->size() / maxContentSize; chunk > 0; chunk--) {
+            sendFragment(*body, offset, maxContentSize);
+            offset += maxContentSize;
+        }
+        uint32_t remainder = body->size() % maxContentSize;
+        if (remainder) {
+            sendFragment(*body, offset, remainder);
+        }
+    } else {
+        AMQFrame copy(f);
+        copy.setChannel(channel);
+        handler.handle(copy);
+    }        
+}
+
+void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const
+{
+    AMQFrame fragment(channel, AMQContentBody(body.getData().substr(offset, size)));
+    handler.handle(fragment);
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h?rev=570538&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h Tue Aug 28 12:38:17 2007
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FrameHandler.h"
+
+#ifndef _SendContent_
+#define _SendContent_
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Functor that sends frame to handler, refragmenting if
+ * necessary. Currently only works on content frames but this could be
+ * changed once we support multi-frame segments in general.
+ */
+class SendContent
+{
+    mutable FrameHandler& handler;
+    const uint16_t channel;
+    const uint16_t maxFrameSize;
+
+    void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const;
+public:
+    SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize);
+    void operator()(AMQFrame& f) const;
+};
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TypeFilter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TypeFilter.h?rev=570538&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TypeFilter.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TypeFilter.h Tue Aug 28 12:38:17 2007
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FrameHandler.h"
+
+#ifndef _TypeFilter_
+#define _TypeFilter_
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Predicate that selects frames by type
+ */
+class TypeFilter
+{
+    std::vector<uint8_t> types;
+public:
+    TypeFilter(uint8_t type) { add(type); }
+    TypeFilter(uint8_t type1, uint8_t type2) { add(type1); add(type2); }
+    void add(uint8_t type) { types.push_back(type); }
+    bool operator()(const AMQFrame& f) const 
+    { 
+        return find(types.begin(), types.end(), f.getBody()->type()) != types.end(); 
+    } 
+};
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TypeFilter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TypeFilter.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h?rev=570538&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h Tue Aug 28 12:38:17 2007
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <ostream>
+#include <iostream>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Buffer.h"
+
+#ifndef _frame_functors_
+#define _frame_functors_
+
+namespace qpid {
+namespace framing {
+
+class SumFrameSize
+{
+    uint64_t size;
+public:
+    SumFrameSize() : size(0) {}
+    void operator()(const AMQFrame& f) { size += f.size(); }
+    uint64_t getSize() { return size; }
+};
+
+class SumBodySize
+{
+    uint64_t size;
+public:
+    SumBodySize() : size(0) {}
+    void operator()(const AMQFrame& f) { size += f.getBody()->size(); }
+    uint64_t getSize() { return size; }
+};
+
+class EncodeFrame
+{
+    Buffer& buffer;
+public:
+    EncodeFrame(Buffer& b) : buffer(b) {}
+    void operator()(const AMQFrame& f) { f.encode(buffer); }
+};
+
+class EncodeBody
+{
+    Buffer& buffer;
+public:
+    EncodeBody(Buffer& b) : buffer(b) {}
+    void operator()(const AMQFrame& f) { f.getBody()->encode(buffer); }
+};
+
+class AccumulateContent
+{
+    std::string& content;
+public:
+    AccumulateContent(std::string& c) : content(c) {}
+    void operator()(const AMQFrame& f) { content += f.castBody<AMQContentBody>()->getData(); }
+};
+
+class Relay
+{
+    FrameHandler& handler;
+    const uint16_t channel;
+
+public:
+    Relay(FrameHandler& h, uint16_t c) : handler(h), channel(c) {}
+
+    void operator()(AMQFrame& f)
+    {
+        AMQFrame copy(f);
+        copy.setChannel(channel);
+        handler.handle(copy);
+    }
+};
+
+class Print
+{
+    std::ostream& out;
+public:
+    Print(std::ostream& o) : out(o) {}
+
+    void operator()(const AMQFrame& f)
+    {
+        out << f << std::endl;
+    }
+};
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp Tue Aug 28 12:38:17 2007
@@ -19,12 +19,14 @@
  *
  */
 #include "qpid/broker/BrokerChannel.h"
-#include "qpid/broker/BrokerMessage.h"
 #include "qpid/broker/BrokerQueue.h"
 #include "qpid/broker/FanOutExchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageDelivery.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid_test_plugin.h"
 #include <iostream>
+#include <sstream>
 #include <memory>
 #include "qpid/framing/AMQP_HighestVersion.h"
 #include "qpid/framing/AMQFrame.h"
@@ -72,7 +74,6 @@
     CPPUNIT_TEST_SUITE(BrokerChannelTest);
     CPPUNIT_TEST(testConsumerMgmt);;
     CPPUNIT_TEST(testDeliveryNoAck);
-    CPPUNIT_TEST(testStaging);
     CPPUNIT_TEST(testQueuePolicy);
     CPPUNIT_TEST(testFlow);
     CPPUNIT_TEST(testAsyncMesgToMoreThanOneQueue);
@@ -155,7 +156,16 @@
 
         void check()
         {
-            CPPUNIT_ASSERT(expected.empty());
+            if (!expected.empty()) {
+                std::stringstream error;
+                error << "Expected: ";
+                while (!expected.empty()) {
+                    MethodCall& m = expected.front();
+                    error << m.name << "(" << m.msg << ", '" << m.data << "'); ";
+                    expected.pop();
+                }
+                CPPUNIT_FAIL(error.str());
+            }
         }
     };
 
@@ -173,7 +183,7 @@
 
     void testConsumerMgmt(){
         Queue::shared_ptr queue(new Queue("my_queue"));
-        Channel channel(connection, recorder, 0, 0);
+        Channel channel(connection, recorder, 0);
         channel.open();
         CPPUNIT_ASSERT(!channel.exists("my_consumer"));
 
@@ -203,7 +213,7 @@
         Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
         Queue::shared_ptr queue(new Queue("my_queue"));
         string tag("test");
-        DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+        DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token"));
         channel.consume(token, tag, queue, false, false, 0);
         queue->deliver(msg);
 	sleep(2);
@@ -213,48 +223,6 @@
         CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
     }
 
-    void testStaging(){
-        MockMessageStore store;
-        connection.setFrameMax(1000);
-        connection.setStagingThreshold(10);
-        Channel channel(connection, recorder, 1, &store);
-        const string data[] = {"abcde", "fghij", "klmno"};
-        
-        Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
-
-        store.expect();
-        store.stage(*msg);
-        for (int i = 0; i < 3; i++) {
-            store.appendContent(*msg, data[i]);
-        }
-        store.destroy(*msg);
-        store.test();
-
-        Exchange::shared_ptr exchange  =
-            broker->getExchanges().declare("my_exchange", "fanout").first;
-        Queue::shared_ptr queue(new Queue("my_queue"));
-        exchange->bind(queue, "", 0);
-
-        AMQHeaderBody header(BASIC);
-        uint64_t contentSize(0);
-        for (int i = 0; i < 3; i++) {
-            contentSize += data[i].size();
-        }
-        header.setContentSize(contentSize);
-        channel.handlePublish(msg);
-        channel.handleHeader(&header);
-
-        for (int i = 0; i < 3; i++) {
-            AMQContentBody body(data[i]);
-            channel.handleContent(&body);
-        }
-        Message::shared_ptr msg2 = queue->dequeue();
-        CPPUNIT_ASSERT_EQUAL(msg, msg2.get());
-        msg2.reset();//should trigger destroy call
-
-        store.check();
-    }
-
 
     //NOTE: strictly speaking this should/could be part of QueueTest,
     //but as it can usefully use the same utility classes as this
@@ -279,7 +247,6 @@
         
         store.expect();
         store.stage(*msg3);
-        store.destroy(*msg3);
         store.test();
 
         Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0));
@@ -348,16 +315,17 @@
         CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
         CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(handler.frames[0].getBody()));
         
-        const string data("abcdefghijklmn");
-
-        Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
-        addContent(msg, data);
         Queue::shared_ptr queue(new Queue("my_queue"));
         string tag("test");
-        DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+        DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token"));
         channel.consume(token, tag, queue, false, false, 0);
         channel.flow(false);
+
+        //'publish' a message
+        Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+        addContent(msg, "abcdefghijklmn");
         queue->deliver(msg);
+
         //ensure no messages have been delivered
         CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size());
 
@@ -369,21 +337,26 @@
         CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
     }
 
-    Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
+    Message::shared_ptr createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
     {
-        BasicMessage* msg = new BasicMessage(
-            0, exchange, routingKey, false, false);
-        AMQHeaderBody header(BASIC);
-        header.setContentSize(contentSize);        
-        msg->setHeader(&header);
-        msg->getHeaderProperties()->setMessageId(messageId);
+        Message::shared_ptr msg(new Message());
+
+        AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+        AMQFrame header(0, AMQHeaderBody());
+
+        msg->getFrames().append(method);
+        msg->getFrames().append(header);
+        MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+        props->setContentLength(contentSize);        
+        props->setMessageId(messageId);
+        msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
         return msg;
     }
 
     void addContent(Message::shared_ptr msg, const string& data)
     {
-        AMQContentBody body(data);
-        msg->addContent(&body);
+        AMQFrame content(0, AMQContentBody(data));
+        msg->getFrames().append(content);
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp Tue Aug 28 12:38:17 2007
@@ -34,7 +34,7 @@
 /** Verify membership in a cluster with one member. */
 BOOST_AUTO_TEST_CASE(testClusterOne) {
     TestCluster cluster("clusterOne", "amqp:one:1");
-    AMQFrame send(VER, 1, SessionOpenBody(VER));
+    AMQFrame send(1, SessionOpenBody(VER));
     cluster.handle(send);
     AMQFrame received;
     BOOST_REQUIRE(cluster.received.waitPop(received));
@@ -60,7 +60,7 @@
         BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
 
         // Exchange frames with child.
-        AMQFrame send(VER, 1, SessionOpenBody(VER));
+        AMQFrame send(1, SessionOpenBody(VER));
         cluster.handle(send);
         AMQFrame received;
         BOOST_REQUIRE(cluster.received.waitPop(received));
@@ -91,8 +91,8 @@
     
 /** Test the ClassifierHandler */
 BOOST_AUTO_TEST_CASE(testClassifierHandlerWiring) {
-    AMQFrame queueDecl(VER, 0, QueueDeclareBody(VER));
-    AMQFrame messageTrans(VER, 0, MessageTransferBody(VER));
+    AMQFrame queueDecl(0, QueueDeclareBody(VER));
+    AMQFrame messageTrans(0, MessageTransferBody(VER));
     shared_ptr<CountHandler> wiring(new CountHandler());
     shared_ptr<CountHandler> other(new CountHandler());
     

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp Tue Aug 28 12:38:17 2007
@@ -40,7 +40,7 @@
     BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *frame.getBody());
     BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
 
-    AMQFrame send(VER, 1, SessionAttachedBody(VER));
+    AMQFrame send(1, SessionAttachedBody(VER));
     cluster.handle(send);
     BOOST_REQUIRE(cluster.received.waitPop(frame));
     BOOST_CHECK_TYPEID_EQUAL(SessionAttachedBody, *frame.getBody());

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Tue Aug 28 12:38:17 2007
@@ -31,6 +31,7 @@
 #include "qpid_test_plugin.h"
 #include <iostream>
 #include "qpid/framing/BasicGetBody.h"
+#include "MessageUtils.h"
 
 using namespace qpid::broker;
 using namespace qpid::framing;
@@ -63,7 +64,7 @@
         queue.reset();
         queue2.reset();
 
-        Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true));
+        Message::shared_ptr msgPtr(MessageUtils::createMessage("exchange", "key", "id"));
         DeliverableMessage msg(msgPtr);
         topic.route(msg, "abc", 0);
         direct.route(msg, "abc", 0);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp Tue Aug 28 12:38:17 2007
@@ -137,8 +137,7 @@
     {
         std::string a = "hostA";
         std::string b = "hostB";
-        AMQFrame in(version, 999,
-                    ConnectionRedirectBody(version, a, b));
+        AMQFrame in(999, ConnectionRedirectBody(version, a, b));
         in.encode(buffer);
         buffer.flip(); 
         AMQFrame out;
@@ -149,7 +148,7 @@
     void testBasicConsumeOkBodyFrame()
     {
         std::string s = "hostA";
-        AMQFrame in(version, 999, BasicConsumeOkBody(version, s));
+        AMQFrame in(999, BasicConsumeOkBody(version, s));
         in.encode(buffer);
         buffer.flip(); 
         AMQFrame out;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/HeaderTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/HeaderTest.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/HeaderTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/HeaderTest.cpp Tue Aug 28 12:38:17 2007
@@ -36,8 +36,8 @@
 
     void testGenericProperties() 
     {
-        AMQHeaderBody body(BASIC);
-        dynamic_cast<BasicHeaderProperties*>(body.getProperties())->getHeaders().setString("A", "BCDE");
+        AMQHeaderBody body;
+        body.get<BasicHeaderProperties>(true)->getHeaders().setString("A", "BCDE");
         Buffer buffer(100);
 
         body.encode(buffer);
@@ -45,7 +45,7 @@
         AMQHeaderBody body2;
         body2.decode(buffer, body.size());
         BasicHeaderProperties* props =
-            dynamic_cast<BasicHeaderProperties*>(body2.getProperties());
+            body2.get<BasicHeaderProperties>(true);
         CPPUNIT_ASSERT_EQUAL(std::string("BCDE"),
                              props->getHeaders().getString("A"));
     }
@@ -64,10 +64,11 @@
 	string userId("guest");
 	string appId("just testing");
 	string clusterId("no clustering required");
+        uint64_t contentLength(54321);
 
-        AMQHeaderBody body(BASIC);
+        AMQFrame out(0, AMQHeaderBody());
         BasicHeaderProperties* properties = 
-            dynamic_cast<BasicHeaderProperties*>(body.getProperties());
+            out.castBody<AMQHeaderBody>()->get<BasicHeaderProperties>(true);
         properties->setContentType(contentType);
         properties->getHeaders().setString("A", "BCDE");
         properties->setDeliveryMode(deliveryMode);
@@ -81,13 +82,14 @@
         properties->setUserId(userId);
         properties->setAppId(appId);
         properties->setClusterId(clusterId);
+        properties->setContentLength(contentLength);
 
         Buffer buffer(10000);
-        body.encode(buffer);
+        out.encode(buffer);
         buffer.flip();     
-        AMQHeaderBody temp;
-        temp.decode(buffer, body.size());
-        properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties());
+        AMQFrame in;
+        in.decode(buffer);
+        properties = in.castBody<AMQHeaderBody>()->get<BasicHeaderProperties>(true);
 
         CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType());
         CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), properties->getHeaders().getString("A"));
@@ -102,6 +104,7 @@
         CPPUNIT_ASSERT_EQUAL(userId, properties->getUserId());
         CPPUNIT_ASSERT_EQUAL(appId, properties->getAppId());
         CPPUNIT_ASSERT_EQUAL(clusterId, properties->getClusterId());
+        CPPUNIT_ASSERT_EQUAL(contentLength, properties->getContentLength());
     }
 
     void testSomeSpecificProperties(){
@@ -111,9 +114,9 @@
         string expiration("Z");
         uint64_t timestamp(0xabe4a34a);
 
-        AMQHeaderBody body(BASIC);
+        AMQHeaderBody body;
         BasicHeaderProperties* properties = 
-            dynamic_cast<BasicHeaderProperties*>(body.getProperties());
+            body.get<BasicHeaderProperties>(true);
         properties->setContentType(contentType);
         properties->setDeliveryMode(deliveryMode);
         properties->setPriority(priority);
@@ -125,7 +128,7 @@
         buffer.flip();     
         AMQHeaderBody temp;
         temp.decode(buffer, body.size());
-        properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties());
+        properties = temp.get<BasicHeaderProperties>(true);
 
         CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType());
         CPPUNIT_ASSERT_EQUAL((int) deliveryMode, (int) properties->getDeliveryMode());

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Aug 28 12:38:17 2007
@@ -82,11 +82,8 @@
   DtxWorkRecordTest     \
   ExchangeTest		\
   HeadersExchangeTest	\
-  InMemoryContentTest	\
-  LazyLoadedContentTest	\
   MessageBuilderTest	\
   MessageTest		\
-  ReferenceTest         \
   QueueRegistryTest	\
   QueueTest		\
   QueuePolicyTest	\
@@ -142,6 +139,7 @@
   .valgrind.supp-default						\
   .valgrindrc-default							\
   InProcessBroker.h							\
+  MessageUtils.h								\
   MockChannel.h								\
   MockConnectionInputHandler.h						\
   TxMocks.h								\