You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/12 16:49:15 UTC

svn commit: r574979 - in /incubator/qpid/trunk/qpid: cpp/rubygen/templates/ cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/client/ cpp/src/tests/ python/ python/qpid/ python/tests_0-10/

Author: gsim
Date: Wed Sep 12 07:49:12 2007
New Revision: 574979

URL: http://svn.apache.org/viewvc?rev=574979&view=rev
Log:
In ClientChannel: Use subscribe and flush in place of get; use per-subscriber flow control for managing prefetches.
In brokers Session: set credit to 0 when subscription is created (modified python tests accordingly)


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
    incubator/qpid/trunk/qpid/python/qpid/testlib.py
    incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py
    incubator/qpid/trunk/qpid/python/tests_0-10/broker.py
    incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py
    incubator/qpid/trunk/qpid/python/tests_0-10/example.py
    incubator/qpid/trunk/qpid/python/tests_0-10/message.py
    incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
    incubator/qpid/trunk/qpid/python/tests_0-10/tx.py

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb Wed Sep 12 07:49:12 2007
@@ -43,7 +43,7 @@
   end
 
   def printable_form(f)
-    if (f.cpptype.name == "u_int8_t")
+    if (f.cpptype.name == "uint8_t")
       return "(int) " + f.cppname
     else
       return f.cppname

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Sep 12 07:49:12 2007
@@ -212,6 +212,7 @@
   qpid/client/ClientQueue.cpp			\
   qpid/client/ConnectionImpl.cpp		\
   qpid/client/Connector.cpp			\
+  qpid/client/Demux.cpp				\
   qpid/client/MessageListener.cpp		\
   qpid/client/Correlator.cpp			\
   qpid/client/CompletionTracker.cpp		\
@@ -297,7 +298,9 @@
   qpid/client/ConnectionImpl.h \
   qpid/client/Connector.h \
   qpid/client/Completion.h \
+  qpid/client/Demux.h \
   qpid/client/MessageListener.h \
+  qpid/client/MessageQueue.h \
   qpid/client/BlockingQueue.h \
   qpid/client/Correlator.h \
   qpid/client/CompletionTracker.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Wed Sep 12 07:49:12 2007
@@ -126,7 +126,7 @@
 void Queue::requestDispatch(Consumer* c, bool sync){
     if (!c || c->preAcquires()) {
         if (sync) {
-            serializer.dispatch();
+            dispatch();
         } else {
             serializer.execute(dispatchCallback);
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Wed Sep 12 07:49:12 2007
@@ -251,8 +251,8 @@
     acquire(_acquire),
     blocked(false), 
     windowing(true), 
-    msgCredit(0xFFFFFFFF), 
-    byteCredit(0xFFFFFFFF) {}
+    msgCredit(0), 
+    byteCredit(0) {}
 
 bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h Wed Sep 12 07:49:12 2007
@@ -62,7 +62,7 @@
         }
     }
 
-    void push(T t)
+    void push(const T& t)
     {
         sys::Monitor::ScopedLock l(lock);
         bool wasEmpty = queue.empty();
@@ -77,6 +77,12 @@
         sys::Monitor::ScopedLock l(lock);
         closed = true;
         lock.notifyAll();
+    }
+    
+    bool empty()
+    {
+        sys::Monitor::ScopedLock l(lock);
+        return queue.empty();
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp Wed Sep 12 07:49:12 2007
@@ -36,7 +36,9 @@
         ChannelCloseBody* closeBody=
             dynamic_cast<ChannelCloseBody*>(body->getMethod());
         if (closeBody) {
-            setState(CLOSED);
+            setState(CLOSED_BY_PEER);
+            code = closeBody->getReplyCode();
+            text = closeBody->getReplyText();
             if (onClose) {
                 onClose(closeBody->getReplyCode(), closeBody->getReplyText());
             }
@@ -65,8 +67,10 @@
     if (getState() == OPEN) {
         frame.setChannel(id);
         out(frame);
-    } else {
+    } else if (getState() == CLOSED) {
         throw Exception("Channel not open");
+    } else if (getState() == CLOSED_BY_PEER) {
+        throw ChannelException(code, text);
     }
 }
 
@@ -80,7 +84,7 @@
 
     std::set<int> states;
     states.insert(OPEN);
-    states.insert(CLOSED);
+    states.insert(CLOSED_BY_PEER);
     waitFor(states);
     if (getState() != OPEN) {
         throw Exception("Failed to open channel.");

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h Wed Sep 12 07:49:12 2007
@@ -30,9 +30,12 @@
 
 class ChannelHandler : private StateManager, public ChainableFrameHandler
 {
-    enum STATES {OPENING, OPEN, CLOSING, CLOSED};
+    enum STATES {OPENING, OPEN, CLOSING, CLOSED, CLOSED_BY_PEER};
     framing::ProtocolVersion version;
     uint16_t id;
+    
+    uint16_t code;
+    std::string text;
 
     void handleMethod(framing::AMQMethodBody* method);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Wed Sep 12 07:49:12 2007
@@ -26,8 +26,10 @@
 #include "ClientMessage.h"
 #include "qpid/QpidError.h"
 #include "Connection.h"
+#include "Demux.h"
 #include "FutureResponse.h"
 #include "MessageListener.h"
+#include "MessageQueue.h"
 #include <boost/format.hpp>
 #include <boost/bind.hpp>
 #include "qpid/framing/all_method_bodies.h"
@@ -72,6 +74,9 @@
         THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
     active = true;
     session = s;
+    if(isTransactional()) {
+        session.txSelect();
+    }
 }
     
 bool Channel::isOpen() const { 
@@ -79,17 +84,8 @@
     return active; 
 }
 
-void Channel::setQos() {
-    session.basicQos((prefetchCount=getPrefetch(), global=false));
-    if(isTransactional()) {
-        //I think this is wrong! should only send TxSelect once...
-        session.txSelect();
-    }
-}
-
-void Channel::setPrefetch(uint16_t _prefetch){
+void Channel::setPrefetch(uint32_t _prefetch){
     prefetch = _prefetch;
-    setQos();
 }
 
 void Channel::declareExchange(Exchange& _exchange, bool synch){
@@ -157,6 +153,9 @@
     session.messageSubscribe(0, _queue.getName(), tag, noLocal, 
                              confirmMode, 0/*pre-acquire*/, 
                              false, fields ? *fields : FieldTable());
+    //allocate some credit:
+    session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
+    session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF);
 }
         
 void Channel::cancel(const std::string& tag, bool synch) {
@@ -173,21 +172,29 @@
     session.messageCancel(tag);
 }
 
-bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
-    Response response = session.basicGet(0, queue.getName(), ackMode == NO_ACK);
-    session.execution().sendFlushRequest();
-    if (response.isA<BasicGetEmptyBody>()) {
+bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
+    string tag = "get-handler";
+    ScopedDivert handler(tag, session.execution().getDemux());
+    Demux::Queue& incoming = handler.getQueue();
+
+    session.messageSubscribe((destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)));
+    session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
+    session.messageFlow(tag, 0/*MESSAGES*/, 1);
+    Completion status = session.messageFlush(tag);
+    status.sync();
+    session.messageCancel(tag);
+
+    if (incoming.empty()) {
         return false;
     } else {
-        FrameSet::shared_ptr content = gets.pop();
-        msg.populate(*content);
+        msg.populate(*(incoming.pop()));
         return true;
     }
 }
 
 void Channel::publish(Message& msg, const Exchange& exchange,
                       const std::string& routingKey, 
-                      bool mandatory, bool /*immediate TODO-restore immediate?*/) {
+                      bool mandatory, bool /*?TODO-restore immediate?*/) {
 
     msg.getDeliveryProperties().setRoutingKey(routingKey);
     msg.getDeliveryProperties().setDiscardUnroutable(!mandatory);
@@ -224,14 +231,23 @@
 
 void Channel::dispatch(FrameSet& content, const std::string& destination)
 {
-    ConsumerMap::iterator i = consumers.find(destination);
-    if (i != consumers.end()) {
+    MessageListener* listener(0);
+    {
+        Mutex::ScopedLock l(lock);
+        ConsumerMap::iterator i = consumers.find(destination);
+        if (i != consumers.end()) {
+            Message msg;
+            msg.populate(content);
+            listener = i->second.listener;
+        }           
+    }    
+    if (listener) {
         Message msg;
         msg.populate(content);
-        i->second.listener->received(msg);
+        listener->received(msg);
     } else {
         QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);                        
-    }               
+    }
 }
 
 void Channel::run() {
@@ -239,12 +255,8 @@
         while (true) {
             FrameSet::shared_ptr content = session.get();
             //need to dispatch this to the relevant listener:
-            if (content->isA<BasicDeliverBody>()) {
-                dispatch(*content, content->as<BasicDeliverBody>()->getConsumerTag());
-            } else if (content->isA<MessageTransferBody>()) {
+            if (content->isA<MessageTransferBody>()) {
                 dispatch(*content, content->as<MessageTransferBody>()->getDestination());
-            } else if (content->isA<BasicGetOkBody>()) {
-                gets.push(content);
             } else {
                 QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod());                        
             }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Wed Sep 12 07:49:12 2007
@@ -71,7 +71,7 @@
     mutable sys::Mutex lock;
     sys::Thread dispatcher;
 
-    uint16_t prefetch;
+    uint32_t prefetch;
     const bool transactional;
     framing::ProtocolVersion version;
 
@@ -88,7 +88,6 @@
 
     void stop();
 
-    void setQos();
     void open(const Session& session);
     void closeInternal();
     void join();
@@ -110,7 +109,7 @@
      * messages the channel is willing to have sent to it
      * asynchronously
      */
-    Channel(bool transactional = false, u_int16_t prefetch = 500);
+    Channel(bool transactional = false, u_int16_t prefetch = 0);
      
     ~Channel();    
 
@@ -204,9 +203,9 @@
     /**
      * Change the prefetch in use.
      */
-    void setPrefetch(uint16_t prefetch);
+    void setPrefetch(uint32_t prefetch);
 
-    uint16_t getPrefetch() { return prefetch; }
+    uint32_t getPrefetch() { return prefetch; }
 
     /**
      * Start message dispatching on a new thread

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp?rev=574979&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp Wed Sep 12 07:49:12 2007
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Demux.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/MessageTransferBody.h"
+
+#include <iostream>
+
+namespace qpid {
+namespace client {
+
+ByTransferDest::ByTransferDest(const std::string& d) : dest(d) {}
+bool ByTransferDest::operator()(const framing::FrameSet& frameset) const
+{
+    return frameset.isA<framing::MessageTransferBody>() &&
+        frameset.as<framing::MessageTransferBody>()->getDestination() == dest;
+}
+
+ScopedDivert::ScopedDivert(const std::string& _dest, Demux& _demuxer) : dest(_dest), demuxer(_demuxer) 
+{
+    queue = &(demuxer.add(dest, ByTransferDest(dest)));
+}
+
+ScopedDivert::~ScopedDivert() 
+{ 
+    demuxer.remove(dest); 
+}
+
+Demux::Queue& ScopedDivert::getQueue()
+{
+    return *queue;
+}
+
+void Demux::handle(framing::FrameSet::shared_ptr frameset)
+{
+    sys::Mutex::ScopedLock l(lock);
+    bool matched = false;
+    for (iterator i = records.begin(); i != records.end() && !matched; i++) {
+        if (i->condition && i->condition(*frameset)) {
+            matched = true;
+            i->queue->push(frameset);
+        }
+    }
+    if (!matched) {
+        defaultQueue.push(frameset);
+    }
+}
+
+void Demux::close()
+{
+    sys::Mutex::ScopedLock l(lock);
+    for (iterator i = records.begin(); i != records.end(); i++) {
+        i->queue->close();
+    }
+    defaultQueue.close();
+}
+
+Demux::Queue& Demux::add(const std::string& name, Condition condition)
+{
+    sys::Mutex::ScopedLock l(lock);
+    iterator i = std::find_if(records.begin(), records.end(), Find(name));
+    if (i == records.end()) {        
+        Record r(name, condition);
+        records.push_back(r);
+        return *(r.queue);
+    } else {
+        throw Exception("Queue already exists for " + name);
+    }
+}
+
+void Demux::remove(const std::string& name)
+{
+    sys::Mutex::ScopedLock l(lock);
+    records.remove_if(Find(name));
+}
+
+Demux::Queue& Demux::get(const std::string& name)
+{
+    sys::Mutex::ScopedLock l(lock);
+    iterator i = std::find_if(records.begin(), records.end(), Find(name));
+    if (i == records.end()) {
+        throw Exception("No queue for " + name);
+    }
+    return *(i->queue);
+}
+
+Demux::Queue& Demux::getDefault()
+{
+    return defaultQueue;
+}
+
+Demux::Find::Find(const std::string& n) : name(n) {}
+
+bool Demux::Find::operator()(const Record& record) const
+{
+    return record.name == name;
+}
+
+}}
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h?rev=574979&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h Wed Sep 12 07:49:12 2007
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <list>
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include "qpid/framing/FrameSet.h"
+#include "qpid/sys/Mutex.h"
+#include "BlockingQueue.h"
+
+#ifndef _Demux_
+#define _Demux_
+
+namespace qpid {
+namespace client {
+
+class ByTransferDest
+{
+    const std::string dest;
+public:
+    ByTransferDest(const std::string& dest);
+    bool operator()(const framing::FrameSet& frameset) const;
+};
+
+class Demux
+{
+public:
+    typedef boost::function<bool(const framing::FrameSet&)> Condition;
+    typedef BlockingQueue<framing::FrameSet::shared_ptr> Queue;
+
+    void handle(framing::FrameSet::shared_ptr);
+    void close();
+
+    Queue& add(const std::string& name, Condition);
+    void remove(const std::string& name);
+    Queue& get(const std::string& name);
+    Queue& getDefault();
+private:
+    typedef boost::shared_ptr<Queue> QueuePtr;
+    struct Record
+    {
+        const std::string name;
+        Condition condition;
+        QueuePtr queue;
+
+        Record(const std::string& n, Condition c) : name(n), condition(c), queue(new Queue()) {}
+    };
+
+    sys::Mutex lock;
+    std::list<Record> records;
+    Queue defaultQueue;
+
+    typedef std::list<Record>::iterator iterator;
+
+    struct Find
+    {
+        const std::string name;
+        Find(const std::string& name);
+        bool operator()(const Record& record) const;
+    };
+};
+
+class ScopedDivert
+{
+    const std::string dest;
+    Demux& demuxer;
+    Demux::Queue* queue;
+public:
+    ScopedDivert(const std::string& dest, Demux& demuxer);
+    ~ScopedDivert();
+    Demux::Queue& getQueue();
+};
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h Wed Sep 12 07:49:12 2007
@@ -22,6 +22,7 @@
 #define _Execution_
 
 #include "qpid/framing/SequenceNumber.h"
+#include "Demux.h"
 
 namespace qpid {
 namespace client {
@@ -33,6 +34,7 @@
     virtual void sendSyncRequest() = 0;
     virtual void sendFlushRequest() = 0;
     virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0;
+    virtual Demux& getDemux() = 0;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Wed Sep 12 07:49:12 2007
@@ -68,7 +68,7 @@
             }
             arriving->append(frame);
             if (arriving->isComplete()) {
-                received.push(arriving);
+                demux.handle(arriving);
                 arriving.reset();
             }
         } else {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Wed Sep 12 07:49:12 2007
@@ -27,10 +27,10 @@
 #include "qpid/framing/FrameSet.h"
 #include "qpid/framing/MethodContent.h"
 #include "qpid/framing/SequenceNumber.h"
-#include "BlockingQueue.h"
 #include "ChainableFrameHandler.h"
 #include "CompletionTracker.h"
 #include "Correlator.h"
+#include "Demux.h"
 #include "Execution.h"
 
 namespace qpid {
@@ -46,7 +46,7 @@
     framing::FrameSet::shared_ptr arriving;
     Correlator correlation;
     CompletionTracker completion;
-    BlockingQueue<framing::FrameSet::shared_ptr> received; 
+    Demux demux;
     framing::ProtocolVersion version;
     uint64_t maxFrameSize;
     framing::AccumulatedAck completionStatus;
@@ -79,7 +79,7 @@
     void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
     Correlator& getCorrelator() { return correlation; }
     CompletionTracker& getCompletionTracker() { return completion; }
-    BlockingQueue<framing::FrameSet::shared_ptr>& getReceived() { return received; }
+    Demux& getDemux() { return demux; }
 };
 
 }}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h?rev=574979&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h Wed Sep 12 07:49:12 2007
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _MessageQueue_
+#define _MessageQueue_
+#include <iostream>
+#include "BlockingQueue.h"
+#include "MessageListener.h"
+
+namespace qpid {
+namespace client {
+
+    /**
+     * A MessageListener implementation that simply queues up
+     * messages.
+     *
+     * \ingroup clientapi
+     */
+    class MessageQueue : public MessageListener, public BlockingQueue<Message>
+    {
+        std::queue<Message> messages;
+    public:
+        void received(Message& msg)
+        {
+            std::cout << "Adding message to queue: " << msg.getData() << std::endl;
+            push(msg);
+        }
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Wed Sep 12 07:49:12 2007
@@ -50,7 +50,7 @@
 
 FrameSet::shared_ptr SessionCore::get()
 {
-    return l3.getReceived().pop();
+    return l3.getDemux().getDefault().pop();
 }
 
 void SessionCore::setSync(bool s)
@@ -71,7 +71,7 @@
 
 void SessionCore::stop()
 {
-    l3.getReceived().close();
+    l3.getDemux().close();
     l3.getCompletionTracker().close();
 }
 
@@ -98,6 +98,8 @@
 
 Future SessionCore::send(const AMQBody& command)
 { 
+    checkClosed();
+
     Future f;
     //any result/response listeners must be set before the command is sent
     if (command.getMethod()->resultExpected()) {
@@ -120,6 +122,7 @@
 
 Future SessionCore::send(const AMQBody& command, const MethodContent& content)
 {
+    checkClosed();
     //content bearing methods don't currently have responses or
     //results, if that changes should follow procedure for the other
     //send method impl:

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Wed Sep 12 07:49:12 2007
@@ -65,6 +65,8 @@
         session.queueDeclare_(queue=queueName, exclusive=true, autoDelete=true);
         //subcribe to the queue with confirm_mode = 1:
         session.messageSubscribe_(queue=queueName, destination=dest, acquireMode=1);
+        session.messageFlow((destination=dest, unit=0, value=1));//messages
+        session.messageFlow((destination=dest, unit=1, value=0xFFFFFFFF));//bytes
         //publish a message:
         TransferContent _content(data);
         _content.getDeliveryProperties().setRoutingKey("my-queue");

Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Wed Sep 12 07:49:12 2007
@@ -1,3 +1,2 @@
 tests_0-10.alternate-exchange.AlternateExchangeTests.test_immediate
-tests_0-10.basic.BasicTests.test_get
 

Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Wed Sep 12 07:49:12 2007
@@ -259,7 +259,16 @@
             else: self.uniqueTag += 1
             consumer_tag = "tag" + str(self.uniqueTag)
             self.channel.message_subscribe(queue=queueName, destination=consumer_tag)
+            self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+            self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
             return self.client.queue(consumer_tag)
+
+    def subscribe(self, channel=None, **keys):
+        channel = channel or self.channel
+        consumer_tag = keys["destination"]
+        channel.message_subscribe(**keys)
+        channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+        channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
 
     def assertEmpty(self, queue):
         """Assert that the queue is empty"""

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py Wed Sep 12 07:49:12 2007
@@ -39,13 +39,13 @@
         #declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages
         channel.queue_declare(queue="returns", exclusive=True)
         channel.queue_bind(queue="returns", exchange="secondary")
-        channel.message_subscribe(destination="a", queue="returns")
+        self.subscribe(destination="a", queue="returns")
         returned = self.client.queue("a")
 
         #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages
         channel.queue_declare(queue="processed", exclusive=True)
         channel.queue_bind(queue="processed", exchange="primary", routing_key="my-key")
-        channel.message_subscribe(destination="b", queue="processed")
+        self.subscribe(destination="b", queue="processed")
         processed = self.client.queue("b")
 
         #publish to the primary exchange
@@ -73,7 +73,7 @@
         channel.exchange_declare(exchange="dlq", type="fanout")
         channel.queue_declare(queue="deleted", exclusive=True)
         channel.queue_bind(exchange="dlq", queue="deleted")
-        channel.message_subscribe(destination="dlq", queue="deleted")
+        self.subscribe(destination="dlq", queue="deleted")
         dlq = self.client.queue("dlq")
 
         #create a queue using the dlq as its alternate exchange:
@@ -103,7 +103,7 @@
         channel.exchange_declare(exchange="dlq", type="fanout")
         channel.queue_declare(queue="immediate", exclusive=True)
         channel.queue_bind(exchange="dlq", queue="immediate")
-        channel.message_subscribe(destination="dlq", queue="immediate")
+        self.subscribe(destination="dlq", queue="immediate")
         dlq = self.client.queue("dlq")
 
         #create a queue using the dlq as its alternate exchange:

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/broker.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/broker.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/broker.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/broker.py Wed Sep 12 07:49:12 2007
@@ -35,7 +35,7 @@
 
         # No ack consumer
         ctag = "tag1"
-        ch.message_subscribe(queue = "myqueue", destination = ctag, confirm_mode = 0)
+        self.subscribe(ch, queue = "myqueue", destination = ctag)
         body = "test no-ack"
         ch.message_transfer(content = Content(body, properties = {"routing_key" : "myqueue"}))
         msg = self.client.queue(ctag).get(timeout = 5)
@@ -44,7 +44,9 @@
         # Acknowledging consumer
         self.queue_declare(ch, queue = "otherqueue")
         ctag = "tag2"
-        ch.message_subscribe(queue = "otherqueue", destination = ctag, confirm_mode = 1)
+        self.subscribe(ch, queue = "otherqueue", destination = ctag, confirm_mode = 1)
+        ch.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF)
+        ch.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF)
         body = "test ack"
         ch.message_transfer(content = Content(body, properties = {"routing_key" : "otherqueue"}))
         msg = self.client.queue(ctag).get(timeout = 5)
@@ -60,7 +62,7 @@
         self.queue_declare(channel, queue="test-queue") 
         channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
         consumer_tag = "tag1"
-        channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0)
+        self.subscribe(queue="test-queue", destination=consumer_tag)
         queue = self.client.queue(consumer_tag)
 
         body = "Immediate Delivery"
@@ -84,7 +86,7 @@
         channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"}))
 
         consumer_tag = "tag1"
-        channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0)
+        self.subscribe(queue="test-queue", destination=consumer_tag)
         queue = self.client.queue(consumer_tag)
         msg = queue.get(timeout=5)
         self.assert_(msg.content.body == body)
@@ -111,7 +113,7 @@
     def test_channel_flow(self):
         channel = self.channel
         channel.queue_declare(queue="flow_test_queue", exclusive=True)
-        channel.message_subscribe(destination="my-tag", queue="flow_test_queue")
+        self.subscribe(destination="my-tag", queue="flow_test_queue")
         incoming = self.client.queue("my-tag")
         
         channel.channel_flow(active=False)

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py Wed Sep 12 07:49:12 2007
@@ -366,7 +366,7 @@
 
         #check the second message is available, but not the first
         self.assertMessageCount(1, "tx-queue")
-        channel.message_subscribe(queue="tx-queue", destination="results", confirm_mode=1)
+        self.subscribe(channel, queue="tx-queue", destination="results", confirm_mode=1)
         msg = self.client.queue("results").get(timeout=1)
         self.assertEqual("two", msg.content['message_id'])
         channel.message_cancel(destination="results")
@@ -602,5 +602,7 @@
 
     def assertMessageId(self, expected, queue):
         self.channel.message_subscribe(queue=queue, destination="results")
+        self.channel.message_flow(destination="results", unit=0, value=1)
+        self.channel.message_flow(destination="results", unit=1, value=0xFFFFFFFF)
         self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id'])
         self.channel.message_cancel(destination="results")

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/example.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/example.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/example.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/example.py Wed Sep 12 07:49:12 2007
@@ -69,6 +69,8 @@
         # field that is filled if the reply includes content. In this case the
         # interesting field is the consumer_tag.
         channel.message_subscribe(queue="test-queue", destination="consumer_tag")
+        channel.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF)
+        channel.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF)
 
         # We can use the Client.queue(...) method to access the queue
         # corresponding to our consumer_tag.

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Wed Sep 12 07:49:12 2007
@@ -34,8 +34,8 @@
         channel.queue_declare(queue="test-queue-1a", exclusive=True)
         channel.queue_declare(queue="test-queue-1b", exclusive=True)
         #establish two consumers one of which excludes delivery of locally sent messages
-        channel.message_subscribe(destination="local_included", queue="test-queue-1a")
-        channel.message_subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
+        self.subscribe(destination="local_included", queue="test-queue-1a")
+        self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
 
         #send a message
         channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local"))
@@ -61,9 +61,9 @@
         channel.queue_declare(queue="test-queue-2", exclusive=True)
 
         #check that an exclusive consumer prevents other consumer being created:
-        channel.message_subscribe(destination="first", queue="test-queue-2", exclusive=True)
+        self.subscribe(destination="first", queue="test-queue-2", exclusive=True)
         try:
-            channel.message_subscribe(destination="second", queue="test-queue-2")
+            self.subscribe(destination="second", queue="test-queue-2")
             self.fail("Expected consume request to fail due to previous exclusive consumer")
         except Closed, e:
             self.assertChannelException(403, e.args[0])
@@ -73,9 +73,9 @@
         channel.channel_open()
 
         #check that an exclusive consumer cannot be created if a consumer already exists:
-        channel.message_subscribe(destination="first", queue="test-queue-2")
+        self.subscribe(channel, destination="first", queue="test-queue-2")
         try:
-            channel.message_subscribe(destination="second", queue="test-queue-2", exclusive=True)
+            self.subscribe(destination="second", queue="test-queue-2", exclusive=True)
             self.fail("Expected exclusive consume request to fail due to previous consumer")
         except Closed, e:
             self.assertChannelException(403, e.args[0])
@@ -87,7 +87,7 @@
         channel = self.channel
         try:
             #queue specified but doesn't exist:
-            channel.message_subscribe(queue="invalid-queue")
+            self.subscribe(queue="invalid-queue", destination="")
             self.fail("Expected failure when consuming from non-existent queue")
         except Closed, e:
             self.assertChannelException(404, e.args[0])
@@ -96,7 +96,7 @@
         channel.channel_open()
         try:
             #queue not specified and none previously declared for channel:
-            channel.message_subscribe(queue="")
+            self.subscribe(channel, queue="", destination="")
             self.fail("Expected failure when consuming from unspecified queue")
         except Closed, e:
             self.assertConnectionException(530, e.args[0])
@@ -110,9 +110,9 @@
         channel.queue_declare(queue="test-queue-3", exclusive=True)
 
         #check that attempts to use duplicate tags are detected and prevented:
-        channel.message_subscribe(destination="first", queue="test-queue-3")
+        self.subscribe(destination="first", queue="test-queue-3")
         try:
-            channel.message_subscribe(destination="first", queue="test-queue-3")
+            self.subscribe(destination="first", queue="test-queue-3")
             self.fail("Expected consume request to fail due to non-unique tag")
         except Closed, e:
             self.assertConnectionException(530, e.args[0])
@@ -124,7 +124,7 @@
         channel = self.channel
         #setup, declare a queue:
         channel.queue_declare(queue="test-queue-4", exclusive=True)
-        channel.message_subscribe(destination="my-consumer", queue="test-queue-4")
+        self.subscribe(destination="my-consumer", queue="test-queue-4")
         channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One"))
 
         #cancel should stop messages being delivered
@@ -150,7 +150,7 @@
         channel = self.channel
         channel.queue_declare(queue="test-ack-queue", exclusive=True)
         
-        channel.message_subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
+        self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
         queue = self.client.queue("consumer_tag")
 
         channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One"))
@@ -194,7 +194,7 @@
         channel = self.channel
         channel.queue_declare(queue="test-requeue", exclusive=True)
         
-        channel.message_subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
+        self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
         queue = self.client.queue("consumer_tag")
 
         channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One"))
@@ -225,7 +225,7 @@
         #requeue unacked messages (Three and Five)
         channel.message_recover(requeue=True)
 
-        channel.message_subscribe(queue="test-requeue", destination="consumer_tag")
+        self.subscribe(queue="test-requeue", destination="consumer_tag")
         queue2 = self.client.queue("consumer_tag")
         
         msg3b = queue2.get(timeout=1)
@@ -256,7 +256,7 @@
         #setup: declare queue and subscribe
         channel = self.channel
         channel.queue_declare(queue="test-prefetch-count", exclusive=True)
-        subscription = channel.message_subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1)
+        subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1)
         queue = self.client.queue("consumer_tag")
 
         #set prefetch to 5:
@@ -298,7 +298,7 @@
         #setup: declare queue and subscribe
         channel = self.channel
         channel.queue_declare(queue="test-prefetch-size", exclusive=True)
-        subscription = channel.message_subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1)
+        subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1)
         queue = self.client.queue("consumer_tag")
 
         #set prefetch to 50 bytes (each message is 9 or 10 bytes):
@@ -345,13 +345,13 @@
         channel.queue_declare(queue = "r", exclusive=True)
         channel.queue_bind(queue = "r", exchange = "amq.fanout")
 
-        channel.message_subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
+        self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
         channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah"))
         msg = self.client.queue("consumer").get(timeout = 1)
         self.assertEquals(msg.content.body, "blah, blah")
         channel.message_reject([msg.command_id, msg.command_id])
 
-        channel.message_subscribe(queue = "r", destination = "checker")
+        self.subscribe(queue = "r", destination = "checker")
         msg = self.client.queue("checker").get(timeout = 1)
         self.assertEquals(msg.content.body, "blah, blah")
 
@@ -365,8 +365,6 @@
         #create consumer (for now that defaults to infinite credit)
         channel.message_subscribe(queue = "q", destination = "c")
         channel.message_flow_mode(mode = 0, destination = "c")
-        #set credit to zero (can remove this once move to proper default for subscribe method)
-        channel.message_stop(destination = "c")
         #send batch of messages to queue
         for i in range(1, 11):
             channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
@@ -397,8 +395,6 @@
         #create consumer (for now that defaults to infinite credit)
         channel.message_subscribe(queue = "q", destination = "c")
         channel.message_flow_mode(mode = 0, destination = "c")
-        #set credit to zero (can remove this once move to proper default for subscribe method)
-        channel.message_stop(destination = "c")
         #send batch of messages to queue
         for i in range(1, 11):
             channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
@@ -431,8 +427,6 @@
         #create consumer (for now that defaults to infinite credit)
         channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
         channel.message_flow_mode(mode = 1, destination = "c")
-        #set credit to zero (can remove this once move to proper default for subscribe method)
-        channel.message_stop(destination = "c")
         #send batch of messages to queue
         for i in range(1, 11):
             channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
@@ -465,8 +459,6 @@
         #create consumer (for now that defaults to infinite credit)
         channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
         channel.message_flow_mode(mode = 1, destination = "c")
-        #set credit to zero (can remove this once move to proper default for subscribe method)
-        channel.message_stop(destination = "c")
         #send batch of messages to queue
         for i in range(1, 11):
             channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
@@ -506,8 +498,8 @@
         for i in range(1, 6):
             channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
 
-        channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
-        channel.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
+        self.subscribe(queue = "q", destination = "a", acquire_mode = 1)
+        self.subscribe(queue = "q", destination = "b", acquire_mode = 1)
 
         for i in range(6, 11):
             channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
@@ -532,7 +524,7 @@
         channel.queue_declare(queue = "q", exclusive=True)
         channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
 
-        channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
+        self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
         msg = self.client.queue("a").get(timeout = 1)
         channel.message_acquire([msg.command_id, msg.command_id])
         msg.complete()
@@ -548,7 +540,7 @@
         channel.queue_declare(queue = "q", exclusive=True)
         channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me"))
 
-        channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
+        self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
         msg = self.client.queue("a").get(timeout = 1)
         channel.message_cancel(destination = "a")
         channel.message_release([msg.command_id, msg.command_id])

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/queue.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/queue.py Wed Sep 12 07:49:12 2007
@@ -49,7 +49,7 @@
 
         #send a further message and consume it, ensuring that the other messages are really gone
         channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"}))
-        channel.message_subscribe(queue="test-queue", destination="tag")
+        self.subscribe(queue="test-queue", destination="tag")
         queue = self.client.queue("tag")
         msg = queue.get(timeout=1)
         self.assertEqual("four", msg.content.body)
@@ -169,8 +169,8 @@
         channel.queue_declare(queue="queue-1", exclusive="True")
         channel.queue_declare(queue="queue-2", exclusive="True")
 
-        channel.message_subscribe(queue="queue-1", destination="queue-1")
-        channel.message_subscribe(queue="queue-2", destination="queue-2")
+        self.subscribe(queue="queue-1", destination="queue-1")
+        self.subscribe(queue="queue-2", destination="queue-2")
 
         queue1 = self.client.queue("queue-1")
         queue2 = self.client.queue("queue-2")
@@ -257,7 +257,7 @@
         channel.channel_open()
 
         #empty queue:
-        channel.message_subscribe(destination="consumer_tag", queue="delete-me-2")
+        self.subscribe(channel, destination="consumer_tag", queue="delete-me-2")
         queue = self.client.queue("consumer_tag")
         msg = queue.get(timeout=1)
         self.assertEqual("message", msg.content.body)
@@ -282,7 +282,7 @@
         #create a queue and register a consumer:
         channel.queue_declare(queue="delete-me-3")
         channel.queue_declare(queue="delete-me-3", passive="True")
-        channel.message_subscribe(destination="consumer_tag", queue="delete-me-3")
+        self.subscribe(destination="consumer_tag", queue="delete-me-3")
 
         #need new channel now:    
         channel2 = self.client.channel(2)

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/tx.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/tx.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/tx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/tx.py Wed Sep 12 07:49:12 2007
@@ -41,13 +41,13 @@
         channel = self.channel
         channel.tx_select()
 
-        channel.message_subscribe(queue="tx-commit-a", destination="qa", confirm_mode=1)
+        self.subscribe(channel, queue="tx-commit-a", destination="qa", confirm_mode=1)
         queue_a = self.client.queue("qa")
 
-        channel.message_subscribe(queue="tx-commit-b", destination="qb", confirm_mode=1)
+        self.subscribe(channel, queue="tx-commit-b", destination="qb", confirm_mode=1)
         queue_b = self.client.queue("qb")
 
-        channel.message_subscribe(queue="tx-commit-c", destination="qc", confirm_mode=1)
+        self.subscribe(channel, queue="tx-commit-c", destination="qc", confirm_mode=1)
         queue_c = self.client.queue("qc")
 
         #check results
@@ -176,7 +176,7 @@
         channel.tx_select()
 
         #consume and ack messages
-        channel.message_subscribe(queue=name_a, destination="sub_a", confirm_mode=1)
+        self.subscribe(channel, queue=name_a, destination="sub_a", confirm_mode=1)
         queue_a = self.client.queue("sub_a")
         for i in range(1, 5):
             msg = queue_a.get(timeout=1)
@@ -184,13 +184,13 @@
 
         msg.complete()
 
-        channel.message_subscribe(queue=name_b, destination="sub_b", confirm_mode=1)
+        self.subscribe(channel, queue=name_b, destination="sub_b", confirm_mode=1)
         queue_b = self.client.queue("sub_b")
         msg = queue_b.get(timeout=1)
         self.assertEqual("Message 6", msg.content.body)
         msg.complete()
 
-        sub_c = channel.message_subscribe(queue=name_c, destination="sub_c", confirm_mode=1)
+        sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", confirm_mode=1)
         queue_c = self.client.queue("sub_c")
         msg = queue_c.get(timeout=1)
         self.assertEqual("Message 7", msg.content.body)