You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/08/02 20:09:51 UTC
svn commit: r562212 [2/2] - in /incubator/qpid/trunk/qpid: cpp/src/
cpp/src/qpid/broker/ cpp/src/qpid/client/ cpp/src/tests/ python/qpid/
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp Thu Aug 2 11:09:48 2007
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FutureResponse.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+
+AMQMethodBody::shared_ptr FutureResponse::getResponse()
+{
+ waitForCompletion();
+ return response;
+}
+
+void FutureResponse::received(AMQMethodBody::shared_ptr r)
+{
+ Monitor::ScopedLock l(lock);
+ response = r;
+ complete = true;
+ lock.notifyAll();
+}
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h Thu Aug 2 11:09:48 2007
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureResponse_
+#define _FutureResponse_
+
+#include "qpid/framing/amqp_framing.h"
+#include "FutureCompletion.h"
+
+namespace qpid {
+namespace client {
+
+class FutureResponse : public FutureCompletion
+{
+ framing::AMQMethodBody::shared_ptr response;
+
+public:
+ framing::AMQMethodBody::shared_ptr getResponse();
+ void received(framing::AMQMethodBody::shared_ptr response);
+};
+
+}}
+
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp Thu Aug 2 11:09:48 2007
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReceivedContent.h"
+
+using qpid::client::ReceivedContent;
+using namespace qpid::framing;
+using namespace boost;
+
+ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {}
+
+void ReceivedContent::append(AMQBody::shared_ptr part)
+{
+ parts.push_back(part);
+}
+
+bool ReceivedContent::isComplete() const
+{
+ if (parts.empty()) {
+ return false;
+ } else if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
+ AMQHeaderBody::shared_ptr headers(getHeaders());
+ return headers && headers->getContentSize() == getContentSize();
+ } else if (isA<MessageTransferBody>()) {
+ //no longer support references, headers and data are still method fields
+ return true;
+ } else {
+ throw Exception("Unknown content class");
+ }
+}
+
+
+AMQMethodBody::shared_ptr ReceivedContent::getMethod() const
+{
+ return parts.empty() ? AMQMethodBody::shared_ptr() : dynamic_pointer_cast<AMQMethodBody>(parts[0]);
+}
+
+AMQHeaderBody::shared_ptr ReceivedContent::getHeaders() const
+{
+ return parts.size() < 2 ? AMQHeaderBody::shared_ptr() : dynamic_pointer_cast<AMQHeaderBody>(parts[1]);
+}
+
+uint64_t ReceivedContent::getContentSize() const
+{
+ if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
+ uint64_t size(0);
+ for (uint i = 2; i < parts.size(); i++) {
+ size += parts[i]->size();
+ }
+ return size;
+ } else if (isA<MessageTransferBody>()) {
+ return as<MessageTransferBody>()->getBody().getValue().size();
+ } else {
+ throw Exception("Unknown content class");
+ }
+}
+
+std::string ReceivedContent::getContent() const
+{
+ if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
+ string data;
+ for (uint i = 2; i < parts.size(); i++) {
+ data += dynamic_pointer_cast<AMQContentBody>(parts[i])->getData();
+ }
+ return data;
+ } else if (isA<MessageTransferBody>()) {
+ return as<MessageTransferBody>()->getBody().getValue();
+ } else {
+ throw Exception("Unknown content class");
+ }
+}
+
+void ReceivedContent::populate(Message& msg)
+{
+ if (!isComplete()) throw Exception("Incomplete message");
+
+ if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
+ BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(getHeaders()->getProperties());
+ BasicHeaderProperties::copy<Message, BasicHeaderProperties>(msg, *properties);
+ msg.setData(getContent());
+ } else if (isA<MessageTransferBody>()) {
+ throw Exception("Transfer not yet supported");
+ } else {
+ throw Exception("Unknown content class");
+ }
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h Thu Aug 2 11:09:48 2007
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <vector>
+#include <boost/shared_ptr.hpp>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "ClientMessage.h"
+
+#ifndef _ReceivedContent_
+#define _ReceivedContent_
+
+namespace qpid {
+namespace client {
+
+/**
+ * Collects the frames representing some received 'content'. This
+ * provides a raw interface to 'message' data and attributes.
+ */
+class ReceivedContent
+{
+ const framing::SequenceNumber id;
+ std::vector<framing::AMQBody::shared_ptr> parts;
+
+public:
+ typedef boost::shared_ptr<ReceivedContent> shared_ptr;
+
+ ReceivedContent(const framing::SequenceNumber& id);
+ void append(framing::AMQBody::shared_ptr part);
+ bool isComplete() const;
+
+ uint64_t getContentSize() const;
+ std::string getContent() const;
+
+ framing::AMQMethodBody::shared_ptr getMethod() const;
+ framing::AMQHeaderBody::shared_ptr getHeaders() const;
+
+ template <class T> bool isA() const {
+ framing::AMQMethodBody::shared_ptr method = getMethod();
+ if (!method) {
+ return false;
+ } else {
+ return method->isA<T>();
+ }
+ }
+
+ template <class T> boost::shared_ptr<T> as() const {
+ framing::AMQMethodBody::shared_ptr method = getMethod();
+ if (method && method->isA<T>()) {
+ return boost::dynamic_pointer_cast<T>(method);
+ } else {
+ return boost::shared_ptr<T>();
+ }
+ }
+
+ const framing::SequenceNumber& getId() const { return id; }
+
+ void populate(Message& msg);
+};
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp Thu Aug 2 11:09:48 2007
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "StateManager.h"
+#include "qpid/framing/amqp_framing.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+StateManager::StateManager(int s) : state(s) {}
+
+void StateManager::waitForStateChange(int current)
+{
+ Monitor::ScopedLock l(stateLock);
+ while (state == current) {
+ stateLock.wait();
+ }
+}
+
+void StateManager::waitFor(int desired)
+{
+ Monitor::ScopedLock l(stateLock);
+ while (state != desired) {
+ stateLock.wait();
+ }
+}
+
+void StateManager::waitFor(std::set<int> desired)
+{
+ Monitor::ScopedLock l(stateLock);
+ while (desired.find(state) == desired.end()) {
+ stateLock.wait();
+ }
+}
+
+
+void StateManager::setState(int s)
+{
+ Monitor::ScopedLock l(stateLock);
+ state = s;
+ stateLock.notifyAll();
+}
+
+int StateManager::getState()
+{
+ Monitor::ScopedLock l(stateLock);
+ return state;
+}
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h Thu Aug 2 11:09:48 2007
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _StateManager_
+#define _StateManager_
+
+#include <set>
+#include "qpid/sys/Monitor.h"
+
+namespace qpid {
+namespace client {
+
+class StateManager
+{
+ int state;
+ sys::Monitor stateLock;
+
+public:
+ StateManager(int initial);
+ void setState(int state);
+ int getState();
+ void waitForStateChange(int current);
+ void waitFor(std::set<int> states);
+ void waitFor(int state);
+};
+
+}}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Aug 2 11:09:48 2007
@@ -31,3 +31,4 @@
Serializer
ais_tests
Visitor
+qpidd.vglog
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp Thu Aug 2 11:09:48 2007
@@ -410,12 +410,14 @@
ASSERT_FRAME("BROKER: Frame[channel=1; ChannelOpenOk: ]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=1; ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=1; rangedExecutionSet={}]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=1; QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
ASSERT_FRAME("BROKER: Frame[channel=1; QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=1; QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet={}]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=3; rangedExecutionSet={}]", *i++);
}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp Thu Aug 2 11:09:48 2007
@@ -61,12 +61,17 @@
}
};
+void execute(Serializer& s, Serializer::Task t)
+{
+ s.execute(t);
+}
+
BOOST_AUTO_TEST_CASE(testSingleThread) {
// Verify that we call in the same thread by default.
Tester tester;
Serializer s;
for (int i = 0; i < 100; ++i)
- s.execute(boost::bind(&Tester::test, &tester));
+ execute(s, boost::bind(&Tester::test, &tester));
// All should be executed in this thread.
BOOST_CHECK_EQUAL(0u, tester.collisions);
BOOST_CHECK_EQUAL(100u, tester.count);
@@ -80,7 +85,7 @@
Tester tester;
Serializer s(false);
for (int i = 0; i < 100; ++i)
- s.execute(boost::bind(&Tester::test, &tester));
+ execute(s, boost::bind(&Tester::test, &tester));
{
// Wait for dispatch thread to complete.
Mutex::ScopedLock l(tester.lock);
@@ -95,7 +100,7 @@
struct Caller : public Runnable, public Tester {
Caller(Serializer& s) : serializer(s) {}
- void run() { serializer.execute(boost::bind(&Tester::test, this)); }
+ void run() { execute(serializer, boost::bind(&Tester::test, this)); }
Serializer& serializer;
};
@@ -134,7 +139,7 @@
serializer.reset(new Serializer(false, ¬ifyDispatch));
Tester tester;
for (int i = 0; i < 100; ++i)
- serializer->execute(boost::bind(&Tester::test, &tester));
+ execute(*serializer, boost::bind(&Tester::test, &tester));
{
// Wait for dispatch thread to complete.
Mutex::ScopedLock l(tester.lock);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp Thu Aug 2 11:09:48 2007
@@ -41,7 +41,6 @@
using namespace qpid::sys;
using std::string;
-bool verbose = false;
/**
* A simple message listener implementation that prints out the
@@ -50,9 +49,10 @@
*/
class SimpleListener : public virtual MessageListener{
Monitor* monitor;
+ bool verbose;
public:
- inline SimpleListener(Monitor* _monitor) : monitor(_monitor){}
+ inline SimpleListener(Monitor* _monitor, bool debug) : monitor(_monitor), verbose(debug) {}
inline virtual void received(Message& msg){
if (verbose)
@@ -101,7 +101,7 @@
//montior to use to notify the main thread when that message
//is received.
Monitor monitor;
- SimpleListener listener(&monitor);
+ SimpleListener listener(&monitor, opts.trace);
string tag("MyTag");
channel.consume(queue, tag, &listener);
if (opts.trace) std::cout << "Registered consumer." << std::endl;
@@ -118,11 +118,6 @@
msg.setData(data);
channel.publish(msg, exchange, "MyTopic");
if (opts.trace) std::cout << "Published message: " << data << std::endl;
- if (opts.trace) {
- std::cout << "Publication "
- << (channel.synchWithServer(qpid::sys::TIME_SEC * 1) ? " DID " : " did NOT ")
- << "complete" << std::endl;
- }
{
Monitor::ScopedLock l(monitor);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp Thu Aug 2 11:09:48 2007
@@ -111,8 +111,7 @@
channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "topic_control", bindArgs);
//set up listener
Listener listener(&channel, response.getName(), args.transactional);
- string tag;
- channel.consume(control, tag, &listener, AckMode(args.ackmode));
+ channel.consume(control, "c1", &listener, AckMode(args.ackmode));
cout << "topic_listener: Consuming." << endl;
channel.run();
connection.close();
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp Thu Aug 2 11:09:48 2007
@@ -121,8 +121,7 @@
//set up listener
Publisher publisher(&channel, "topic_control", args.transactional);
- string tag("mytag");
- channel.consume(response, tag, &publisher, AckMode(args.ackmode));
+ channel.consume(response, "mytag", &publisher, AckMode(args.ackmode));
channel.start();
int batchSize(args.batches);
Modified: incubator/qpid/trunk/qpid/python/qpid/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/message.py?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/message.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/message.py Thu Aug 2 11:09:48 2007
@@ -26,7 +26,7 @@
self.frame = frame
self.method = frame.method_type
self.content = content
- if self.method.klass.name != "execution":
+ if self.method.is_l4_command():
self.command_id = self.channel.incoming_completion.sequence.next()
#print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name
Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Thu Aug 2 11:09:48 2007
@@ -376,7 +376,7 @@
def next_command(self, method):
#the following test is a hack until the track/sub-channel is available
- if method.klass.name != "execution":
+ if method.is_l4_command():
self.command_id = self.sequence.next()
def reset(self):
@@ -424,13 +424,6 @@
self.sequence = Sequence(1) #issues ids for incoming commands
self.mark = 0 #id of last command of whose completion notification was sent to the other peer
self.channel = channel
-
- def next_id(self, method):
- #the following test is a hack until the track/sub-channel is available
- if method.klass.name != "execution":
- return self.sequence.next()
- else:
- return 0
def reset(self):
self.sequence = Sequence(1) #reset counter
Modified: incubator/qpid/trunk/qpid/python/qpid/spec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec.py?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/spec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/spec.py Thu Aug 2 11:09:48 2007
@@ -191,6 +191,9 @@
self.docs = docs
self.response = False
+ def is_l4_command(self):
+ return self.klass.name not in ["execution", "channel", "connection"]
+
def arguments(self, *args, **kwargs):
nargs = len(args) + len(kwargs)
maxargs = len(self.fields)