You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/13 19:29:18 UTC

svn commit: r575377 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/client/ cpp/src/qpid/framing/ cpp/src/tests/ python/qpid/

Author: gsim
Date: Thu Sep 13 10:29:16 2007
New Revision: 575377

URL: http://svn.apache.org/viewvc?rev=575377&view=rev
Log:
Use frameset begin/end flags for determining frameset boundaries.
Set frameset & segment begin/end flags for content bearing methods (i.e. messages).


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckMode.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
    incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
    incubator/qpid/trunk/qpid/python/qpid/connection.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Thu Sep 13 10:29:16 2007
@@ -126,6 +126,7 @@
 void Queue::requestDispatch(Consumer* c, bool sync){
     if (!c || c->preAcquires()) {
         if (sync) {
+	    Mutex::ScopedLock locker(messageLock);
             dispatch();
         } else {
             serializer.execute(dispatchCallback);
@@ -153,7 +154,9 @@
         int start = next;
         while(c){
             next++;
-            if(c->deliver(msg)) return true;            
+            if(c->deliver(msg)) {
+                return true;            
+            }
             next = next % acquirers.size();
             c = next == start ? 0 : acquirers[next];            
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Thu Sep 13 10:29:16 2007
@@ -30,12 +30,15 @@
                                Queue::shared_ptr _queue, 
                                const string _consumerTag, 
                                const DeliveryId _id,
-                               bool _acquired) : msg(_msg), 
-                                                                queue(_queue), 
-                                                                consumerTag(_consumerTag),
-                                                                id(_id),
-                                                                acquired(_acquired),
-                                                                pull(false){}
+                               bool _acquired, bool _confirmed) : msg(_msg), 
+                                                                  queue(_queue), 
+                                                                  consumerTag(_consumerTag),
+                                                                  id(_id),
+                                                                  acquired(_acquired),
+                                                                  confirmed(_confirmed),
+                                                                  pull(false)
+{
+}
 
 DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, 
                                Queue::shared_ptr _queue, 
@@ -44,11 +47,12 @@
                                                                 consumerTag(""),
                                                                 id(_id),
                                                                 acquired(true),
+                                                                confirmed(false),
                                                                 pull(true){}
 
 
 void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
-    if (acquired) {
+    if (acquired && !confirmed) {
         queue->dequeue(ctxt, msg.payload);
     }
 }
@@ -70,24 +74,30 @@
 }
 
 void DeliveryRecord::redeliver(Session* const session) const{
-    if(pull){
-        //if message was originally sent as response to get, we must requeue it
-        requeue();
-    }else{
-        session->deliver(msg.payload, consumerTag, id);
+    if (!confirmed) {
+        if(pull){
+            //if message was originally sent as response to get, we must requeue it
+            requeue();
+        }else{
+            session->deliver(msg.payload, consumerTag, id);
+        }
     }
 }
 
 void DeliveryRecord::requeue() const
 {
-    msg.payload->redeliver();
-    queue->requeue(msg);
+    if (!confirmed) {
+        msg.payload->redeliver();
+        queue->requeue(msg);
+    }
 }
 
 void DeliveryRecord::release() 
 {
-    queue->requeue(msg);
-    acquired = false;
+    if (!confirmed) {
+        queue->requeue(msg);
+        acquired = false;
+    }
 }
 
 void DeliveryRecord::reject() 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Thu Sep 13 10:29:16 2007
@@ -45,11 +45,12 @@
     const std::string consumerTag;
     const DeliveryId id;
     bool acquired;
+    const bool confirmed;
     const bool pull;
 
   public:
     DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, 
-                   const DeliveryId id, bool acquired);
+                   const DeliveryId id, bool acquired, bool confirmed = false);
     DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id);
             
     void dequeue(TransactionContext* ctxt = 0) const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Sep 13 10:29:16 2007
@@ -144,7 +144,7 @@
     if (isContentReleased()) {
         //load content from store in chunks of maxContentSize
         uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
-        uint64_t expectedSize(frames.getHeaders()->getContentLength());//TODO: how do we know how much data to load?
+        uint64_t expectedSize(frames.getHeaders()->getContentLength());
         for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize)
         {            
             uint64_t remaining = expectedSize - offset;
@@ -153,11 +153,22 @@
 
             store->loadContent(*this, data, offset,
                                remaining > maxContentSize ? maxContentSize : remaining);
+            frame.setBof(false);
+            if (offset > 0) {
+                frame.setBos(false);
+            }
+            if (remaining) {
+                frame.setEos(false);
+                frame.setEof(false);
+            }
             out.handle(frame);
         }
 
     } else {
-        SendContent f(out, channel, maxFrameSize);
+        Count c;
+        frames.map_if(c, TypeFilter(CONTENT_BODY));
+
+        SendContent f(out, channel, maxFrameSize, c.getCount());
         frames.map_if(f, TypeFilter(CONTENT_BODY));
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp Thu Sep 13 10:29:16 2007
@@ -39,7 +39,7 @@
 struct BaseToken : DeliveryToken
 {
     virtual ~BaseToken() {}
-    virtual void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) = 0;
+    virtual AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id) = 0;
 };
 
 struct BasicGetToken : BaseToken
@@ -50,12 +50,11 @@
 
     BasicGetToken(Queue::shared_ptr q) : queue(q) {}
 
-    void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id)
+    AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id)
     {
-        channel.send(BasicGetOkBody(
-            channel.getVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(),
+        return AMQFrame(0, BasicGetOkBody(
+            ProtocolVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(),
             msg->getRoutingKey(), queue->getMessageCount())); 
-
     }
 };
 
@@ -67,10 +66,10 @@
 
     BasicConsumeToken(const string c) : consumer(c) {}
 
-    void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id)
+    AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id)
     {
-        channel.send(BasicDeliverBody(
-            channel.getVersion(), consumer, id.getValue(),
+        return AMQFrame(0, BasicDeliverBody(
+            ProtocolVersion(), consumer, id.getValue(),
             msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey()));
     }
 
@@ -85,16 +84,13 @@
     MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) : 
         destination(d), confirmMode(c), acquireMode(a) {}
 
-    void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId /*id*/)
+    AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId /*id*/)
     {
-        //TODO; need to figure out how the acquire mode gets
-        //communicated (this is just a temporary solution)
-        channel.send(MessageTransferBody(channel.getVersion(), 0, destination, confirmMode, acquireMode));
-
         //may need to set the redelivered flag:
         if (msg->getRedelivered()){
             msg->getProperties<DeliveryProperties>()->setRedelivered(true);
         }
+        return AMQFrame(0, MessageTransferBody(ProtocolVersion(), 0, destination, confirmMode, acquireMode));
     }
 };
 
@@ -127,11 +123,15 @@
     //another may well have the wrong headers; however we will only
     //have one content class for 0-10 proper
 
+    FrameHandler& handler = channel.getHandlers().out;
+
     //send method
     boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token);
-    t->sendMethod(msg, channel, id);
+    AMQFrame method = t->sendMethod(msg, id);
+    method.setEof(false);
+    method.setChannel(channel.getId());
+    handler.handle(method);
 
-    FrameHandler& handler = channel.getHandlers().out;
     msg->sendHeader(handler, channel.getId(), framesize);
     msg->sendContent(handler, channel.getId(), framesize);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Thu Sep 13 10:29:16 2007
@@ -172,10 +172,11 @@
 DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
 {
     Mutex::ScopedLock l(outLock);
-    SequenceNumber copy(outgoing.hwm);
-    ++copy;
-    MessageDelivery::deliver(msg, *this, copy.getValue(), token, connection.getFrameMax());
-    return outgoing.hwm.getValue();
+    //SequenceNumber copy(outgoing.hwm);
+    //++copy;
+    MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax());
+    return outgoing.hwm;
+    //return outgoing.hwm.getValue();
 }
 
 void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Thu Sep 13 10:29:16 2007
@@ -268,8 +268,8 @@
 
             DeliveryId deliveryTag =
                 parent->deliveryAdapter->deliver(msg.payload, token);
-            if (ackExpected) {
-                parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire));
+            if (windowing || ackExpected) {
+                parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected));
             }
         }
         return !blocked;
@@ -565,12 +565,14 @@
     ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
     ack_iterator end = start;
      
-    if (first == last) {
-        //just acked single element (move end past it)
-        ++end;
-    } else {
-        //need to find end (position it just after the last record in range)
-        end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+    if (start != unacked.end()) {
+        if (first == last) {
+            //just acked single element (move end past it)
+            ++end;
+        } else {
+            //need to find end (position it just after the last record in range)
+            end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+        }
     }
     return AckRange(start, end);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckMode.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckMode.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckMode.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckMode.h Thu Sep 13 10:29:16 2007
@@ -1,72 +1,25 @@
 #ifndef _client_AckMode_h
 #define _client_AckMode_h
 
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-namespace qpid {
-namespace client {
-
-/**
- * The available acknowledgements modes.
- * 
- * \ingroup clientapi
- */
-enum AckMode {
-    /** No acknowledgement will be sent, broker can
-        discard messages as soon as they are delivered
-        to a consumer using this mode. **/
-    NO_ACK     = 0,  
-    /** Each message will be automatically
-        acknowledged as soon as it is delivered to the
-        application **/  
-    AUTO_ACK   = 1,  
-    /** Acknowledgements will be sent automatically,
-        but not for each message. **/
-    LAZY_ACK   = 2,
-    /** The application is responsible for explicitly
-        acknowledging messages. **/  
-    CLIENT_ACK = 3 
-};
-
-}} // namespace qpid::client
-
-
-
-#endif  /*!_client_AckMode_h*/
-#ifndef _client_AckMode_h
-#define _client_AckMode_h
 
 /*
  *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
  */
 
@@ -97,6 +50,4 @@
 
 }} // namespace qpid::client
 
-
-
-#endif  /*!_client_AckMode_h*/
+#endif

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp Thu Sep 13 10:29:16 2007
@@ -58,7 +58,7 @@
         if (body->getMethod()) 
             handleMethod(body->getMethod());
         else
-            throw new ConnectionException(504, "Channel not open.");
+            throw ConnectionException(504, "Channel not open for content.");
     }
 }
 
@@ -68,7 +68,7 @@
         frame.setChannel(id);
         out(frame);
     } else if (getState() == CLOSED) {
-        throw Exception("Channel not open");
+        throw Exception(QPID_MSG("Channel not open, can't send " << frame));
     } else if (getState() == CLOSED_BY_PEER) {
         throw ChannelException(code, text);
     }
@@ -120,7 +120,7 @@
         } //else just ignore it
         break;
       case CLOSED:
-        throw ConnectionException(504, "Channel not opened.");
+        throw ConnectionException(504, "Channel is closed.");
       default:
         throw Exception("Unexpected state encountered in ChannelHandler!");
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Thu Sep 13 10:29:16 2007
@@ -69,7 +69,7 @@
 
 void Channel::open(const Session& s)
 {
-    Mutex::ScopedLock l(lock);
+    Mutex::ScopedLock l(stopLock);
     if (isOpen())
         THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
     active = true;
@@ -80,7 +80,7 @@
 }
     
 bool Channel::isOpen() const { 
-    Mutex::ScopedLock l(lock);
+    Mutex::ScopedLock l(stopLock);
     return active; 
 }
 
@@ -146,7 +146,7 @@
         Consumer& c = consumers[tag];
         c.listener = listener;
         c.ackMode = ackMode;
-        c.lastDeliveryTag = 0;
+        c.count = 0;
     }
     uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1;
     ScopedSync s(session, synch);
@@ -205,7 +205,7 @@
 {
     session.close();
     {
-        Mutex::ScopedLock l(lock);
+        Mutex::ScopedLock l(stopLock);
         active = false;
     }
     stop();
@@ -231,20 +231,18 @@
 
 void Channel::dispatch(FrameSet& content, const std::string& destination)
 {
-    MessageListener* listener(0);
-    {
-        Mutex::ScopedLock l(lock);
-        ConsumerMap::iterator i = consumers.find(destination);
-        if (i != consumers.end()) {
-            Message msg;
-            msg.populate(content);
-            listener = i->second.listener;
-        }           
-    }    
-    if (listener) {
+    ConsumerMap::iterator i = consumers.find(destination);
+    if (i != consumers.end()) {
         Message msg;
         msg.populate(content);
+        MessageListener* listener = i->second.listener;
         listener->received(msg);
+        if (isOpen() && i->second.ackMode != CLIENT_ACK) {
+            bool send = i->second.ackMode == AUTO_ACK 
+                || (prefetch &&  ++(i->second.count) > (prefetch / 2));
+            if (send) i->second.count = 0;
+            session.execution().completed(content.getId(), true, send);
+        }
     } else {
         QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);                        
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Thu Sep 13 10:29:16 2007
@@ -63,8 +63,7 @@
     struct Consumer{
         MessageListener* listener;
         AckMode ackMode;
-        int count;
-        u_int64_t lastDeliveryTag;
+        uint32_t count;
     };
     typedef std::map<std::string, Consumer> ConsumerMap;
         
@@ -75,7 +74,7 @@
     const bool transactional;
     framing::ProtocolVersion version;
 
-    sys::Mutex stopLock;
+    mutable sys::Mutex stopLock;
     bool running;
 
     ConsumerMap consumers;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp Thu Sep 13 10:29:16 2007
@@ -25,14 +25,15 @@
 using namespace qpid::framing;
 using namespace boost;
 
-void Correlator::receive(AMQMethodBody* response)
+bool Correlator::receive(const AMQMethodBody* response)
 {
     if (listeners.empty()) {
-        throw ConnectionException(503, "Unexpected method!");//TODO: include the method & class name
+        return false;
     } else {
         Listener l = listeners.front();
         if (l) l(response);
         listeners.pop();
+        return true;
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h Thu Sep 13 10:29:16 2007
@@ -36,9 +36,9 @@
 class Correlator
 {
 public:
-    typedef boost::function<void(framing::AMQMethodBody*)> Listener;
+    typedef boost::function<void(const framing::AMQMethodBody*)> Listener;
 
-    void receive(framing::AMQMethodBody*);
+    bool receive(const framing::AMQMethodBody*);
     void listen(Listener l);
 
 private:

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Thu Sep 13 10:29:16 2007
@@ -62,19 +62,16 @@
 {
     AMQBody* body = frame.getBody();
     if (!invoke(body, this)) {
-        if (isContentFrame(frame)) {
-            if (!arriving) {
-                arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
-            }
-            arriving->append(frame);
-            if (arriving->isComplete()) {
+        if (!arriving) {
+            arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
+        }
+        arriving->append(frame);
+        if (arriving->isComplete()) {
+            if (arriving->isContentBearing() || !correlation.receive(arriving->getMethod())) {
                 demux.handle(arriving);
-                arriving.reset();
-            }
-        } else {
-            ++incoming.hwm;    
-            correlation.receive(body->getMethod());
-        }        
+            }        
+            arriving.reset();
+        }
     }
 }
 
@@ -168,11 +165,19 @@
 
 SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l)
 {
+    return send(command, l, false);
+}
+
+SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent)
+{
     SequenceNumber id = ++outgoing.hwm;
     if(l) {
         completion.listenForResult(id, l);
     }
     AMQFrame frame(0/*channel will be filled in be channel handler*/, command);
+    if (hasContent) {
+        frame.setEof(false);
+    }
     out(frame);
     return id;
 }
@@ -180,7 +185,7 @@
 SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content, 
                                       CompletionTracker::ResultListener l)
 {
-    SequenceNumber id = send(command, l);
+    SequenceNumber id = send(command, l, true);
     sendContent(content);
     return id;
 }
@@ -188,14 +193,16 @@
 void ExecutionHandler::sendContent(const MethodContent& content)
 {
     AMQFrame header(0, content.getHeader());
-    out(header);
-
+    header.setBof(false);
     u_int64_t data_length = content.getData().length();
     if(data_length > 0){
+        header.setEof(false);
+        out(header);   
         //frame itself uses 8 bytes
         u_int32_t frag_size = maxFrameSize - 8;
         if(data_length < frag_size){
             AMQFrame frame(0, AMQContentBody(content.getData()));
+            frame.setBof(false);
             out(frame);
         }else{
             u_int32_t offset = 0;
@@ -204,10 +211,20 @@
                 u_int32_t length = remaining > frag_size ? frag_size : remaining;
                 string frag(content.getData().substr(offset, length));
                 AMQFrame frame(0, AMQContentBody(frag));
-                out(frame);
+                frame.setBof(false);
+                if (offset > 0) {
+                    frame.setBos(false);
+                }
                 offset += length;
                 remaining = data_length - offset;
+                if (remaining) {
+                    frame.setEos(false);
+                    frame.setEof(false);
+                }
+                out(frame);
             }
         }
+    } else {
+        out(header);   
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Thu Sep 13 10:29:16 2007
@@ -59,6 +59,7 @@
 
     void sendCompletion();
 
+    framing::SequenceNumber send(const framing::AMQBody&, CompletionTracker::ResultListener, bool hasContent);
     void sendContent(const framing::MethodContent&);
 
 public:

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp Thu Sep 13 10:29:16 2007
@@ -35,7 +35,7 @@
     return response.get();
 }
 
-void FutureResponse::received(AMQMethodBody* r)
+void FutureResponse::received(const AMQMethodBody* r)
 {
     Monitor::ScopedLock l(lock);
     response = *r;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h Thu Sep 13 10:29:16 2007
@@ -36,7 +36,7 @@
     framing::MethodHolder response;
 public:
     framing::AMQMethodBody* getResponse(SessionCore& session);
-    void received(framing::AMQMethodBody* response);
+    void received(const framing::AMQMethodBody* response);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Thu Sep 13 10:29:16 2007
@@ -156,7 +156,9 @@
 
 std::ostream& operator<<(std::ostream& out, const AMQFrame& f)
 {
-    return out << "Frame[channel=" << f.getChannel() << "; " << *f.getBody()
+    return out << "Frame[" 
+        //<< "B=" << f.getBof() << "E=" << f.getEof() << "b=" << f.getBos() << "e=" << f.getEos() << "; "
+               << "channel=" << f.getChannel() << "; " << *f.getBody()
                << "]";
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Thu Sep 13 10:29:16 2007
@@ -74,6 +74,17 @@
     void encode(Buffer& buffer) const; 
     bool decode(Buffer& buffer); 
     uint32_t size() const;
+
+    bool getBof() const { return bof; }
+    void setBof(bool isBof) { bof = isBof; }
+    bool getEof() const { return eof; }
+    void setEof(bool isEof) { eof = isEof; }
+
+    bool getBos() const { return bos; }
+    void setBos(bool isBos) { bos = isBos; }
+    bool getEos() const { return eos; }
+    void setEos(bool isEos) { eos = isEos; }
+
     static uint32_t frameOverhead();
 
   private:

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp Thu Sep 13 10:29:16 2007
@@ -38,20 +38,13 @@
 
 bool FrameSet::isComplete() const
 {
-    //TODO: should eventually use the 0-10 frame header flags when available
+    return !parts.empty() && parts.back().getEof();
+}
+
+bool FrameSet::isContentBearing() const
+{
     const AMQMethodBody* method = getMethod();
-    if (!method) {
-        return false;
-    } else if (method->isContentBearing()) {
-        const AMQHeaderBody* header = getHeaders();
-        if (header) {
-            return header->getContentLength() == getContentSize();
-        } else {
-            return false;
-        }
-    } else {
-        return true;        
-    }
+    return method && method->isContentBearing();
 }
 
 const AMQMethodBody* FrameSet::getMethod() const

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h Thu Sep 13 10:29:16 2007
@@ -50,6 +50,8 @@
     void getContent(std::string&) const;
     std::string getContent() const;
 
+    bool isContentBearing() const;
+
     const AMQMethodBody* getMethod() const;
     const AMQHeaderBody* getHeaders() const;
     AMQHeaderBody* getHeaders();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp Thu Sep 13 10:29:16 2007
@@ -21,31 +21,47 @@
 
 #include "SendContent.h"
 
-qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs) : handler(h), channel(c), maxFrameSize(mfs) {}
+qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs, uint efc) : handler(h), channel(c), 
+                                                                                              maxFrameSize(mfs),
+                                                                                               expectedFrameCount(efc), frameCount(0) {}
 
-void qpid::framing::SendContent::operator()(AMQFrame& f) const
+void qpid::framing::SendContent::operator()(const AMQFrame& f)
 {
+    bool first = frameCount == 0;
+    bool last = ++frameCount == expectedFrameCount;
+
     uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
     const AMQContentBody* body(f.castBody<AMQContentBody>()); 
     if (body->size() > maxContentSize) {
         uint32_t offset = 0;
         for (int chunk = body->size() / maxContentSize; chunk > 0; chunk--) {
-            sendFragment(*body, offset, maxContentSize);
+            sendFragment(*body, offset, maxContentSize, first && offset == 0, last && offset + maxContentSize == body->size());
             offset += maxContentSize;
         }
         uint32_t remainder = body->size() % maxContentSize;
         if (remainder) {
-            sendFragment(*body, offset, remainder);
+            sendFragment(*body, offset, remainder, first && offset == 0, last);
         }
     } else {
         AMQFrame copy(f);
+        setFlags(copy, first, last);
         copy.setChannel(channel);
         handler.handle(copy);
     }        
 }
 
-void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const
+void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const
 {
     AMQFrame fragment(channel, AMQContentBody(body.getData().substr(offset, size)));
+    setFlags(fragment, first, last);
     handler.handle(fragment);
 }
+
+void qpid::framing::SendContent::setFlags(AMQFrame& f, bool first, bool last) const
+{
+    f.setBof(false);
+    f.setBos(first);
+    f.setEof(last);
+    f.setEos(last);
+}
+

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h Thu Sep 13 10:29:16 2007
@@ -39,11 +39,14 @@
     mutable FrameHandler& handler;
     const uint16_t channel;
     const uint16_t maxFrameSize;
+    uint expectedFrameCount;
+    uint frameCount;
 
-    void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const;
+    void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const;
+    void setFlags(AMQFrame& f, bool first, bool last) const;
 public:
-    SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize);
-    void operator()(AMQFrame& f) const;
+    SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize, uint frameCount);
+    void operator()(const AMQFrame& f);
 };
 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp Thu Sep 13 10:29:16 2007
@@ -63,7 +63,10 @@
 
 void TransferContent::populate(const FrameSet& frameset)
 {
-    header = *frameset.getHeaders();
+    const AMQHeaderBody* h = frameset.getHeaders();
+    if (h) {
+        header = *h;
+    }
     frameset.getContent(data);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h Thu Sep 13 10:29:16 2007
@@ -49,6 +49,15 @@
     uint64_t getSize() { return size; }
 };
 
+class Count
+{
+    uint count;
+public:
+    Count() : count(0) {}
+    void operator()(const AMQFrame&) { count++; }
+    uint getCount() { return count; }
+};
+
 class EncodeFrame
 {
     Buffer& buffer;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp Thu Sep 13 10:29:16 2007
@@ -126,6 +126,10 @@
         AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
         AMQFrame header(0, AMQHeaderBody());
         AMQFrame content(0, AMQContentBody(data));
+        method.setEof(false);
+        header.setBof(false);
+        header.setEof(false);
+        content.setBof(false);
 
         header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data.size());        
         header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
@@ -156,6 +160,12 @@
         AMQFrame header(0, AMQHeaderBody());
         AMQFrame content1(0, AMQContentBody(data1));
         AMQFrame content2(0, AMQContentBody(data2));
+        method.setEof(false);
+        header.setBof(false);
+        header.setEof(false);
+        content1.setBof(false);
+        content1.setEof(false);
+        content2.setBof(false);
 
         header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size());        
         header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp Thu Sep 13 10:29:16 2007
@@ -114,6 +114,7 @@
             channel.consume(control, "c1", &listener, AckMode(args.ackmode));
             cout << "topic_listener: Consuming." << endl;
             channel.run();
+            cout << "topic_listener: run returned, closing connection" << endl;
             connection.close();
             cout << "topic_listener: normal exit" << endl;
         }

Modified: incubator/qpid/trunk/qpid/python/qpid/connection.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection.py?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection.py Thu Sep 13 10:29:16 2007
@@ -122,7 +122,13 @@
 
   def write_0_10(self, frame):
     c = self.codec
-    c.encode_octet(0x0f) # TODO: currently fixed at ver=0, B=E=b=e=1
+    flags = 0
+    if frame.bof: flags |= 0x08
+    if frame.eof: flags |= 0x04
+    if frame.bos: flags |= 0x02
+    if frame.eos: flags |= 0x01
+
+    c.encode_octet(flags) # TODO: currently fixed at ver=0, B=E=b=e=1
     c.encode_octet(self.spec.constants.byname[frame.type].id)
     body = StringIO()
     enc = codec.Codec(body, self.spec)
@@ -197,6 +203,10 @@
   def init(self, args, kwargs):
     self.channel = kwargs.pop("channel", 0)
     self.subchannel = kwargs.pop("subchannel", 0)
+    self.bos = True
+    self.eos = True
+    self.bof = True
+    self.eof = True
 
   def encode(self, enc): abstract
 
@@ -216,6 +226,7 @@
     self.method = method
     self.method_type = method
     self.args = args
+    self.eof = not method.content
 
   def encode(self, c):
     c.encode_short(self.method.klass.id)
@@ -302,6 +313,8 @@
     self.weight = weight
     self.size = size
     self.properties = properties
+    self.eof = size == 0
+    self.bof = False
 
   def __getitem__(self, name):
     return self.properties[name]
@@ -429,6 +442,8 @@
 
   def __init__(self, content):
     self.content = content
+    self.eof = True
+    self.bof = False
 
   def encode(self, enc):
     enc.write(self.content)