You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/08/02 20:09:51 UTC

svn commit: r562212 [2/2] - in /incubator/qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/client/ cpp/src/tests/ python/qpid/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp Thu Aug  2 11:09:48 2007
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FutureResponse.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+
+AMQMethodBody::shared_ptr FutureResponse::getResponse()
+{
+    waitForCompletion();
+    return response;
+}
+
+void FutureResponse::received(AMQMethodBody::shared_ptr r)
+{
+    Monitor::ScopedLock l(lock);
+    response = r;
+    complete = true;
+    lock.notifyAll();
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h Thu Aug  2 11:09:48 2007
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureResponse_
+#define _FutureResponse_
+
+#include "qpid/framing/amqp_framing.h"
+#include "FutureCompletion.h"
+
+namespace qpid {
+namespace client {
+
+class FutureResponse : public FutureCompletion 
+{
+    framing::AMQMethodBody::shared_ptr response;
+
+public:
+    framing::AMQMethodBody::shared_ptr getResponse();
+    void received(framing::AMQMethodBody::shared_ptr response);
+};
+
+}}
+
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp Thu Aug  2 11:09:48 2007
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReceivedContent.h"
+
+using qpid::client::ReceivedContent;
+using namespace qpid::framing;
+using namespace boost;
+
+ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {}
+
+void ReceivedContent::append(AMQBody::shared_ptr part)
+{
+    parts.push_back(part);
+}
+
+bool ReceivedContent::isComplete() const
+{
+    if (parts.empty()) {
+         return false;
+    } else if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
+        AMQHeaderBody::shared_ptr headers(getHeaders());
+        return headers && headers->getContentSize() == getContentSize();
+    } else if (isA<MessageTransferBody>()) {
+        //no longer support references, headers and data are still method fields
+        return true;
+    } else {
+        throw Exception("Unknown content class");
+    }
+}
+
+
+AMQMethodBody::shared_ptr ReceivedContent::getMethod() const
+{
+    return parts.empty() ? AMQMethodBody::shared_ptr() : dynamic_pointer_cast<AMQMethodBody>(parts[0]);
+}
+
+AMQHeaderBody::shared_ptr ReceivedContent::getHeaders() const
+{
+    return parts.size() < 2 ? AMQHeaderBody::shared_ptr() : dynamic_pointer_cast<AMQHeaderBody>(parts[1]);
+}
+
+uint64_t ReceivedContent::getContentSize() const
+{
+    if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
+        uint64_t size(0);
+        for (uint i = 2; i < parts.size(); i++) {
+            size += parts[i]->size();
+        }
+        return size;
+    } else if (isA<MessageTransferBody>()) {
+        return as<MessageTransferBody>()->getBody().getValue().size();
+    } else {
+        throw Exception("Unknown content class");
+    }    
+}
+
+std::string ReceivedContent::getContent() const
+{
+    if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
+        string data;
+        for (uint i = 2; i < parts.size(); i++) {
+            data += dynamic_pointer_cast<AMQContentBody>(parts[i])->getData();
+        }
+        return data;
+    } else if (isA<MessageTransferBody>()) {
+        return as<MessageTransferBody>()->getBody().getValue();
+    } else {
+        throw Exception("Unknown content class");
+    }
+}
+
+void ReceivedContent::populate(Message& msg)
+{
+    if (!isComplete()) throw Exception("Incomplete message");
+
+    if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
+        BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(getHeaders()->getProperties());
+        BasicHeaderProperties::copy<Message, BasicHeaderProperties>(msg, *properties);
+        msg.setData(getContent());
+    } else if (isA<MessageTransferBody>()) {
+        throw Exception("Transfer not yet supported");
+    } else {
+        throw Exception("Unknown content class");
+    }    
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h Thu Aug  2 11:09:48 2007
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <vector>
+#include <boost/shared_ptr.hpp>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "ClientMessage.h"
+
+#ifndef _ReceivedContent_
+#define _ReceivedContent_
+
+namespace qpid {
+namespace client {
+
+/**
+ * Collects the frames representing some received 'content'. This
+ * provides a raw interface to 'message' data and attributes.
+ */
+class ReceivedContent
+{
+    const framing::SequenceNumber id;
+    std::vector<framing::AMQBody::shared_ptr> parts;
+
+public:
+    typedef boost::shared_ptr<ReceivedContent> shared_ptr;
+
+    ReceivedContent(const framing::SequenceNumber& id);
+    void append(framing::AMQBody::shared_ptr part);
+    bool isComplete() const;
+
+    uint64_t getContentSize() const;
+    std::string getContent() const;
+
+    framing::AMQMethodBody::shared_ptr getMethod() const;
+    framing::AMQHeaderBody::shared_ptr getHeaders() const;
+     
+    template <class T> bool isA() const {
+        framing::AMQMethodBody::shared_ptr method = getMethod();
+        if (!method) {
+            return false;
+        } else {
+            return method->isA<T>();
+        }
+    }
+
+    template <class T> boost::shared_ptr<T> as() const {
+        framing::AMQMethodBody::shared_ptr method = getMethod();
+        if (method && method->isA<T>()) {
+            return boost::dynamic_pointer_cast<T>(method);
+        } else {
+            return boost::shared_ptr<T>();
+        }
+    }    
+
+    const framing::SequenceNumber& getId() const { return id; }
+
+    void populate(Message& msg);
+};
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp Thu Aug  2 11:09:48 2007
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "StateManager.h"
+#include "qpid/framing/amqp_framing.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+StateManager::StateManager(int s) : state(s) {}
+
+void StateManager::waitForStateChange(int current)
+{
+    Monitor::ScopedLock l(stateLock);
+    while (state == current) {
+        stateLock.wait();
+    }
+}
+
+void StateManager::waitFor(int desired)
+{
+    Monitor::ScopedLock l(stateLock);
+    while (state != desired) {
+        stateLock.wait();
+    }
+}
+
+void StateManager::waitFor(std::set<int> desired)
+{
+    Monitor::ScopedLock l(stateLock);
+    while (desired.find(state) == desired.end()) {
+        stateLock.wait();
+    }
+}
+
+
+void StateManager::setState(int s)
+{
+    Monitor::ScopedLock l(stateLock);
+    state = s;
+    stateLock.notifyAll();
+}
+
+int StateManager::getState()
+{
+    Monitor::ScopedLock l(stateLock);
+    return state;
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h Thu Aug  2 11:09:48 2007
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _StateManager_
+#define _StateManager_
+
+#include <set>
+#include "qpid/sys/Monitor.h"
+
+namespace qpid {
+namespace client {
+
+class StateManager
+{
+    int state;
+    sys::Monitor stateLock;
+    
+public:
+    StateManager(int initial);
+    void setState(int state);
+    int getState();
+    void waitForStateChange(int current);
+    void waitFor(std::set<int> states);
+    void waitFor(int state);
+};
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Aug  2 11:09:48 2007
@@ -31,3 +31,4 @@
 Serializer
 ais_tests
 Visitor
+qpidd.vglog

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp Thu Aug  2 11:09:48 2007
@@ -410,12 +410,14 @@
         ASSERT_FRAME("BROKER: Frame[channel=1; ChannelOpenOk: ]", *i++);
         ASSERT_FRAME("CLIENT: Frame[channel=1; ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++);
         ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
-        ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++);
+        ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=1; rangedExecutionSet={}]", *i++);
         ASSERT_FRAME("CLIENT: Frame[channel=1; QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
         ASSERT_FRAME("BROKER: Frame[channel=1; QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
+        ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
+        ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++);
         ASSERT_FRAME("CLIENT: Frame[channel=1; QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++);
         ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
-        ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet={}]", *i++);
+        ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=3; rangedExecutionSet={}]", *i++);
     }
  };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp Thu Aug  2 11:09:48 2007
@@ -61,12 +61,17 @@
     }
 };
 
+void execute(Serializer& s, Serializer::Task t) 
+{
+    s.execute(t);
+}
+
 BOOST_AUTO_TEST_CASE(testSingleThread) {
     // Verify that we call in the same thread by default.
     Tester tester;
     Serializer s;
     for (int i = 0; i < 100; ++i) 
-        s.execute(boost::bind(&Tester::test, &tester));
+        execute(s, boost::bind(&Tester::test, &tester));
     // All should be executed in this thread.
     BOOST_CHECK_EQUAL(0u, tester.collisions);
     BOOST_CHECK_EQUAL(100u, tester.count);
@@ -80,7 +85,7 @@
     Tester tester;
     Serializer s(false);
     for (int i = 0; i < 100; ++i)
-        s.execute(boost::bind(&Tester::test, &tester));
+        execute(s, boost::bind(&Tester::test, &tester));
     {
         // Wait for dispatch thread to complete.
         Mutex::ScopedLock l(tester.lock);
@@ -95,7 +100,7 @@
 
 struct Caller : public Runnable, public Tester {
     Caller(Serializer& s) : serializer(s) {}
-    void run() { serializer.execute(boost::bind(&Tester::test, this)); }
+    void run() { execute(serializer, boost::bind(&Tester::test, this)); }
     Serializer& serializer;
 };
 
@@ -134,7 +139,7 @@
     serializer.reset(new Serializer(false, &notifyDispatch));
     Tester tester;
     for (int i = 0; i < 100; ++i) 
-        serializer->execute(boost::bind(&Tester::test, &tester));
+        execute(*serializer, boost::bind(&Tester::test, &tester));
     {
         // Wait for dispatch thread to complete.
         Mutex::ScopedLock l(tester.lock);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp Thu Aug  2 11:09:48 2007
@@ -41,7 +41,6 @@
 using namespace qpid::sys;
 using std::string;
 
-bool verbose = false;
 
 /**
  * A simple message listener implementation that prints out the
@@ -50,9 +49,10 @@
  */
 class SimpleListener : public virtual MessageListener{
     Monitor* monitor;
+    bool verbose;
 
 public:
-    inline SimpleListener(Monitor* _monitor) : monitor(_monitor){}
+    inline SimpleListener(Monitor* _monitor, bool debug) : monitor(_monitor), verbose(debug) {}
 
     inline virtual void received(Message& msg){
         if (verbose)
@@ -101,7 +101,7 @@
 	//montior to use to notify the main thread when that message
 	//is received.
 	Monitor monitor;
-	SimpleListener listener(&monitor);
+	SimpleListener listener(&monitor, opts.trace);
 	string tag("MyTag");
 	channel.consume(queue, tag, &listener);
 	if (opts.trace) std::cout << "Registered consumer." << std::endl;
@@ -118,11 +118,6 @@
 	msg.setData(data);
 	channel.publish(msg, exchange, "MyTopic");
 	if (opts.trace) std::cout << "Published message: " << data << std::endl;
-        if (opts.trace) {
-            std::cout << "Publication " 
-                      << (channel.synchWithServer(qpid::sys::TIME_SEC * 1) ? " DID " : " did NOT ") 
-                      << "complete"  << std::endl;
-        }
 
 	{
             Monitor::ScopedLock l(monitor);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp Thu Aug  2 11:09:48 2007
@@ -111,8 +111,7 @@
             channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "topic_control", bindArgs);
             //set up listener
             Listener listener(&channel, response.getName(), args.transactional);
-            string tag;
-            channel.consume(control, tag, &listener, AckMode(args.ackmode));
+            channel.consume(control, "c1", &listener, AckMode(args.ackmode));
             cout << "topic_listener: Consuming." << endl;
             channel.run();
             connection.close();

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp Thu Aug  2 11:09:48 2007
@@ -121,8 +121,7 @@
 
             //set up listener
             Publisher publisher(&channel, "topic_control", args.transactional);
-            string tag("mytag");
-            channel.consume(response, tag, &publisher, AckMode(args.ackmode));
+            channel.consume(response, "mytag", &publisher, AckMode(args.ackmode));
             channel.start();
 
             int batchSize(args.batches);

Modified: incubator/qpid/trunk/qpid/python/qpid/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/message.py?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/message.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/message.py Thu Aug  2 11:09:48 2007
@@ -26,7 +26,7 @@
     self.frame = frame
     self.method = frame.method_type
     self.content = content
-    if self.method.klass.name != "execution":
+    if self.method.is_l4_command():
       self.command_id = self.channel.incoming_completion.sequence.next()
       #print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name
       

Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Thu Aug  2 11:09:48 2007
@@ -376,7 +376,7 @@
 
   def next_command(self, method):
     #the following test is a hack until the track/sub-channel is available
-    if method.klass.name != "execution":
+    if method.is_l4_command():
       self.command_id = self.sequence.next()
 
   def reset(self):
@@ -424,13 +424,6 @@
     self.sequence = Sequence(1) #issues ids for incoming commands
     self.mark = 0               #id of last command of whose completion notification was sent to the other peer
     self.channel = channel
-
-  def next_id(self, method):
-    #the following test is a hack until the track/sub-channel is available
-    if method.klass.name != "execution":
-      return self.sequence.next()
-    else:
-      return 0
 
   def reset(self):
     self.sequence = Sequence(1) #reset counter

Modified: incubator/qpid/trunk/qpid/python/qpid/spec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec.py?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/spec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/spec.py Thu Aug  2 11:09:48 2007
@@ -191,6 +191,9 @@
     self.docs = docs
     self.response = False
 
+  def is_l4_command(self):
+    return self.klass.name not in ["execution", "channel", "connection"]
+
   def arguments(self, *args, **kwargs):
     nargs = len(args) + len(kwargs)
     maxargs = len(self.fields)