You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/12 16:49:15 UTC
svn commit: r574979 - in /incubator/qpid/trunk/qpid: cpp/rubygen/templates/
cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/client/ cpp/src/tests/ python/
python/qpid/ python/tests_0-10/
Author: gsim
Date: Wed Sep 12 07:49:12 2007
New Revision: 574979
URL: http://svn.apache.org/viewvc?rev=574979&view=rev
Log:
In ClientChannel: Use subscribe and flush in place of get; use per-subscriber flow control for managing prefetches.
In brokers Session: set credit to 0 when subscription is created (modified python tests accordingly)
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
incubator/qpid/trunk/qpid/python/qpid/testlib.py
incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py
incubator/qpid/trunk/qpid/python/tests_0-10/broker.py
incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py
incubator/qpid/trunk/qpid/python/tests_0-10/example.py
incubator/qpid/trunk/qpid/python/tests_0-10/message.py
incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
incubator/qpid/trunk/qpid/python/tests_0-10/tx.py
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb Wed Sep 12 07:49:12 2007
@@ -43,7 +43,7 @@
end
def printable_form(f)
- if (f.cpptype.name == "u_int8_t")
+ if (f.cpptype.name == "uint8_t")
return "(int) " + f.cppname
else
return f.cppname
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Sep 12 07:49:12 2007
@@ -212,6 +212,7 @@
qpid/client/ClientQueue.cpp \
qpid/client/ConnectionImpl.cpp \
qpid/client/Connector.cpp \
+ qpid/client/Demux.cpp \
qpid/client/MessageListener.cpp \
qpid/client/Correlator.cpp \
qpid/client/CompletionTracker.cpp \
@@ -297,7 +298,9 @@
qpid/client/ConnectionImpl.h \
qpid/client/Connector.h \
qpid/client/Completion.h \
+ qpid/client/Demux.h \
qpid/client/MessageListener.h \
+ qpid/client/MessageQueue.h \
qpid/client/BlockingQueue.h \
qpid/client/Correlator.h \
qpid/client/CompletionTracker.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Wed Sep 12 07:49:12 2007
@@ -126,7 +126,7 @@
void Queue::requestDispatch(Consumer* c, bool sync){
if (!c || c->preAcquires()) {
if (sync) {
- serializer.dispatch();
+ dispatch();
} else {
serializer.execute(dispatchCallback);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Wed Sep 12 07:49:12 2007
@@ -251,8 +251,8 @@
acquire(_acquire),
blocked(false),
windowing(true),
- msgCredit(0xFFFFFFFF),
- byteCredit(0xFFFFFFFF) {}
+ msgCredit(0),
+ byteCredit(0) {}
bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h Wed Sep 12 07:49:12 2007
@@ -62,7 +62,7 @@
}
}
- void push(T t)
+ void push(const T& t)
{
sys::Monitor::ScopedLock l(lock);
bool wasEmpty = queue.empty();
@@ -77,6 +77,12 @@
sys::Monitor::ScopedLock l(lock);
closed = true;
lock.notifyAll();
+ }
+
+ bool empty()
+ {
+ sys::Monitor::ScopedLock l(lock);
+ return queue.empty();
}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp Wed Sep 12 07:49:12 2007
@@ -36,7 +36,9 @@
ChannelCloseBody* closeBody=
dynamic_cast<ChannelCloseBody*>(body->getMethod());
if (closeBody) {
- setState(CLOSED);
+ setState(CLOSED_BY_PEER);
+ code = closeBody->getReplyCode();
+ text = closeBody->getReplyText();
if (onClose) {
onClose(closeBody->getReplyCode(), closeBody->getReplyText());
}
@@ -65,8 +67,10 @@
if (getState() == OPEN) {
frame.setChannel(id);
out(frame);
- } else {
+ } else if (getState() == CLOSED) {
throw Exception("Channel not open");
+ } else if (getState() == CLOSED_BY_PEER) {
+ throw ChannelException(code, text);
}
}
@@ -80,7 +84,7 @@
std::set<int> states;
states.insert(OPEN);
- states.insert(CLOSED);
+ states.insert(CLOSED_BY_PEER);
waitFor(states);
if (getState() != OPEN) {
throw Exception("Failed to open channel.");
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h Wed Sep 12 07:49:12 2007
@@ -30,9 +30,12 @@
class ChannelHandler : private StateManager, public ChainableFrameHandler
{
- enum STATES {OPENING, OPEN, CLOSING, CLOSED};
+ enum STATES {OPENING, OPEN, CLOSING, CLOSED, CLOSED_BY_PEER};
framing::ProtocolVersion version;
uint16_t id;
+
+ uint16_t code;
+ std::string text;
void handleMethod(framing::AMQMethodBody* method);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Wed Sep 12 07:49:12 2007
@@ -26,8 +26,10 @@
#include "ClientMessage.h"
#include "qpid/QpidError.h"
#include "Connection.h"
+#include "Demux.h"
#include "FutureResponse.h"
#include "MessageListener.h"
+#include "MessageQueue.h"
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include "qpid/framing/all_method_bodies.h"
@@ -72,6 +74,9 @@
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
active = true;
session = s;
+ if(isTransactional()) {
+ session.txSelect();
+ }
}
bool Channel::isOpen() const {
@@ -79,17 +84,8 @@
return active;
}
-void Channel::setQos() {
- session.basicQos((prefetchCount=getPrefetch(), global=false));
- if(isTransactional()) {
- //I think this is wrong! should only send TxSelect once...
- session.txSelect();
- }
-}
-
-void Channel::setPrefetch(uint16_t _prefetch){
+void Channel::setPrefetch(uint32_t _prefetch){
prefetch = _prefetch;
- setQos();
}
void Channel::declareExchange(Exchange& _exchange, bool synch){
@@ -157,6 +153,9 @@
session.messageSubscribe(0, _queue.getName(), tag, noLocal,
confirmMode, 0/*pre-acquire*/,
false, fields ? *fields : FieldTable());
+ //allocate some credit:
+ session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
+ session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF);
}
void Channel::cancel(const std::string& tag, bool synch) {
@@ -173,21 +172,29 @@
session.messageCancel(tag);
}
-bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- Response response = session.basicGet(0, queue.getName(), ackMode == NO_ACK);
- session.execution().sendFlushRequest();
- if (response.isA<BasicGetEmptyBody>()) {
+bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
+ string tag = "get-handler";
+ ScopedDivert handler(tag, session.execution().getDemux());
+ Demux::Queue& incoming = handler.getQueue();
+
+ session.messageSubscribe((destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)));
+ session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
+ session.messageFlow(tag, 0/*MESSAGES*/, 1);
+ Completion status = session.messageFlush(tag);
+ status.sync();
+ session.messageCancel(tag);
+
+ if (incoming.empty()) {
return false;
} else {
- FrameSet::shared_ptr content = gets.pop();
- msg.populate(*content);
+ msg.populate(*(incoming.pop()));
return true;
}
}
void Channel::publish(Message& msg, const Exchange& exchange,
const std::string& routingKey,
- bool mandatory, bool /*immediate TODO-restore immediate?*/) {
+ bool mandatory, bool /*?TODO-restore immediate?*/) {
msg.getDeliveryProperties().setRoutingKey(routingKey);
msg.getDeliveryProperties().setDiscardUnroutable(!mandatory);
@@ -224,14 +231,23 @@
void Channel::dispatch(FrameSet& content, const std::string& destination)
{
- ConsumerMap::iterator i = consumers.find(destination);
- if (i != consumers.end()) {
+ MessageListener* listener(0);
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(destination);
+ if (i != consumers.end()) {
+ Message msg;
+ msg.populate(content);
+ listener = i->second.listener;
+ }
+ }
+ if (listener) {
Message msg;
msg.populate(content);
- i->second.listener->received(msg);
+ listener->received(msg);
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
- }
+ }
}
void Channel::run() {
@@ -239,12 +255,8 @@
while (true) {
FrameSet::shared_ptr content = session.get();
//need to dispatch this to the relevant listener:
- if (content->isA<BasicDeliverBody>()) {
- dispatch(*content, content->as<BasicDeliverBody>()->getConsumerTag());
- } else if (content->isA<MessageTransferBody>()) {
+ if (content->isA<MessageTransferBody>()) {
dispatch(*content, content->as<MessageTransferBody>()->getDestination());
- } else if (content->isA<BasicGetOkBody>()) {
- gets.push(content);
} else {
QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod());
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Wed Sep 12 07:49:12 2007
@@ -71,7 +71,7 @@
mutable sys::Mutex lock;
sys::Thread dispatcher;
- uint16_t prefetch;
+ uint32_t prefetch;
const bool transactional;
framing::ProtocolVersion version;
@@ -88,7 +88,6 @@
void stop();
- void setQos();
void open(const Session& session);
void closeInternal();
void join();
@@ -110,7 +109,7 @@
* messages the channel is willing to have sent to it
* asynchronously
*/
- Channel(bool transactional = false, u_int16_t prefetch = 500);
+ Channel(bool transactional = false, u_int16_t prefetch = 0);
~Channel();
@@ -204,9 +203,9 @@
/**
* Change the prefetch in use.
*/
- void setPrefetch(uint16_t prefetch);
+ void setPrefetch(uint32_t prefetch);
- uint16_t getPrefetch() { return prefetch; }
+ uint32_t getPrefetch() { return prefetch; }
/**
* Start message dispatching on a new thread
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp?rev=574979&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp Wed Sep 12 07:49:12 2007
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Demux.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/MessageTransferBody.h"
+
+#include <iostream>
+
+namespace qpid {
+namespace client {
+
+ByTransferDest::ByTransferDest(const std::string& d) : dest(d) {}
+bool ByTransferDest::operator()(const framing::FrameSet& frameset) const
+{
+ return frameset.isA<framing::MessageTransferBody>() &&
+ frameset.as<framing::MessageTransferBody>()->getDestination() == dest;
+}
+
+ScopedDivert::ScopedDivert(const std::string& _dest, Demux& _demuxer) : dest(_dest), demuxer(_demuxer)
+{
+ queue = &(demuxer.add(dest, ByTransferDest(dest)));
+}
+
+ScopedDivert::~ScopedDivert()
+{
+ demuxer.remove(dest);
+}
+
+Demux::Queue& ScopedDivert::getQueue()
+{
+ return *queue;
+}
+
+void Demux::handle(framing::FrameSet::shared_ptr frameset)
+{
+ sys::Mutex::ScopedLock l(lock);
+ bool matched = false;
+ for (iterator i = records.begin(); i != records.end() && !matched; i++) {
+ if (i->condition && i->condition(*frameset)) {
+ matched = true;
+ i->queue->push(frameset);
+ }
+ }
+ if (!matched) {
+ defaultQueue.push(frameset);
+ }
+}
+
+void Demux::close()
+{
+ sys::Mutex::ScopedLock l(lock);
+ for (iterator i = records.begin(); i != records.end(); i++) {
+ i->queue->close();
+ }
+ defaultQueue.close();
+}
+
+Demux::Queue& Demux::add(const std::string& name, Condition condition)
+{
+ sys::Mutex::ScopedLock l(lock);
+ iterator i = std::find_if(records.begin(), records.end(), Find(name));
+ if (i == records.end()) {
+ Record r(name, condition);
+ records.push_back(r);
+ return *(r.queue);
+ } else {
+ throw Exception("Queue already exists for " + name);
+ }
+}
+
+void Demux::remove(const std::string& name)
+{
+ sys::Mutex::ScopedLock l(lock);
+ records.remove_if(Find(name));
+}
+
+Demux::Queue& Demux::get(const std::string& name)
+{
+ sys::Mutex::ScopedLock l(lock);
+ iterator i = std::find_if(records.begin(), records.end(), Find(name));
+ if (i == records.end()) {
+ throw Exception("No queue for " + name);
+ }
+ return *(i->queue);
+}
+
+Demux::Queue& Demux::getDefault()
+{
+ return defaultQueue;
+}
+
+Demux::Find::Find(const std::string& n) : name(n) {}
+
+bool Demux::Find::operator()(const Record& record) const
+{
+ return record.name == name;
+}
+
+}}
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h?rev=574979&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h Wed Sep 12 07:49:12 2007
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <list>
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include "qpid/framing/FrameSet.h"
+#include "qpid/sys/Mutex.h"
+#include "BlockingQueue.h"
+
+#ifndef _Demux_
+#define _Demux_
+
+namespace qpid {
+namespace client {
+
+class ByTransferDest
+{
+ const std::string dest;
+public:
+ ByTransferDest(const std::string& dest);
+ bool operator()(const framing::FrameSet& frameset) const;
+};
+
+class Demux
+{
+public:
+ typedef boost::function<bool(const framing::FrameSet&)> Condition;
+ typedef BlockingQueue<framing::FrameSet::shared_ptr> Queue;
+
+ void handle(framing::FrameSet::shared_ptr);
+ void close();
+
+ Queue& add(const std::string& name, Condition);
+ void remove(const std::string& name);
+ Queue& get(const std::string& name);
+ Queue& getDefault();
+private:
+ typedef boost::shared_ptr<Queue> QueuePtr;
+ struct Record
+ {
+ const std::string name;
+ Condition condition;
+ QueuePtr queue;
+
+ Record(const std::string& n, Condition c) : name(n), condition(c), queue(new Queue()) {}
+ };
+
+ sys::Mutex lock;
+ std::list<Record> records;
+ Queue defaultQueue;
+
+ typedef std::list<Record>::iterator iterator;
+
+ struct Find
+ {
+ const std::string name;
+ Find(const std::string& name);
+ bool operator()(const Record& record) const;
+ };
+};
+
+class ScopedDivert
+{
+ const std::string dest;
+ Demux& demuxer;
+ Demux::Queue* queue;
+public:
+ ScopedDivert(const std::string& dest, Demux& demuxer);
+ ~ScopedDivert();
+ Demux::Queue& getQueue();
+};
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h Wed Sep 12 07:49:12 2007
@@ -22,6 +22,7 @@
#define _Execution_
#include "qpid/framing/SequenceNumber.h"
+#include "Demux.h"
namespace qpid {
namespace client {
@@ -33,6 +34,7 @@
virtual void sendSyncRequest() = 0;
virtual void sendFlushRequest() = 0;
virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0;
+ virtual Demux& getDemux() = 0;
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Wed Sep 12 07:49:12 2007
@@ -68,7 +68,7 @@
}
arriving->append(frame);
if (arriving->isComplete()) {
- received.push(arriving);
+ demux.handle(arriving);
arriving.reset();
}
} else {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Wed Sep 12 07:49:12 2007
@@ -27,10 +27,10 @@
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MethodContent.h"
#include "qpid/framing/SequenceNumber.h"
-#include "BlockingQueue.h"
#include "ChainableFrameHandler.h"
#include "CompletionTracker.h"
#include "Correlator.h"
+#include "Demux.h"
#include "Execution.h"
namespace qpid {
@@ -46,7 +46,7 @@
framing::FrameSet::shared_ptr arriving;
Correlator correlation;
CompletionTracker completion;
- BlockingQueue<framing::FrameSet::shared_ptr> received;
+ Demux demux;
framing::ProtocolVersion version;
uint64_t maxFrameSize;
framing::AccumulatedAck completionStatus;
@@ -79,7 +79,7 @@
void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
Correlator& getCorrelator() { return correlation; }
CompletionTracker& getCompletionTracker() { return completion; }
- BlockingQueue<framing::FrameSet::shared_ptr>& getReceived() { return received; }
+ Demux& getDemux() { return demux; }
};
}}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h?rev=574979&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h Wed Sep 12 07:49:12 2007
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _MessageQueue_
+#define _MessageQueue_
+#include <iostream>
+#include "BlockingQueue.h"
+#include "MessageListener.h"
+
+namespace qpid {
+namespace client {
+
+ /**
+ * A MessageListener implementation that simply queues up
+ * messages.
+ *
+ * \ingroup clientapi
+ */
+ class MessageQueue : public MessageListener, public BlockingQueue<Message>
+ {
+ std::queue<Message> messages;
+ public:
+ void received(Message& msg)
+ {
+ std::cout << "Adding message to queue: " << msg.getData() << std::endl;
+ push(msg);
+ }
+ };
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Wed Sep 12 07:49:12 2007
@@ -50,7 +50,7 @@
FrameSet::shared_ptr SessionCore::get()
{
- return l3.getReceived().pop();
+ return l3.getDemux().getDefault().pop();
}
void SessionCore::setSync(bool s)
@@ -71,7 +71,7 @@
void SessionCore::stop()
{
- l3.getReceived().close();
+ l3.getDemux().close();
l3.getCompletionTracker().close();
}
@@ -98,6 +98,8 @@
Future SessionCore::send(const AMQBody& command)
{
+ checkClosed();
+
Future f;
//any result/response listeners must be set before the command is sent
if (command.getMethod()->resultExpected()) {
@@ -120,6 +122,7 @@
Future SessionCore::send(const AMQBody& command, const MethodContent& content)
{
+ checkClosed();
//content bearing methods don't currently have responses or
//results, if that changes should follow procedure for the other
//send method impl:
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Wed Sep 12 07:49:12 2007
@@ -65,6 +65,8 @@
session.queueDeclare_(queue=queueName, exclusive=true, autoDelete=true);
//subcribe to the queue with confirm_mode = 1:
session.messageSubscribe_(queue=queueName, destination=dest, acquireMode=1);
+ session.messageFlow((destination=dest, unit=0, value=1));//messages
+ session.messageFlow((destination=dest, unit=1, value=0xFFFFFFFF));//bytes
//publish a message:
TransferContent _content(data);
_content.getDeliveryProperties().setRoutingKey("my-queue");
Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Wed Sep 12 07:49:12 2007
@@ -1,3 +1,2 @@
tests_0-10.alternate-exchange.AlternateExchangeTests.test_immediate
-tests_0-10.basic.BasicTests.test_get
Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Wed Sep 12 07:49:12 2007
@@ -259,7 +259,16 @@
else: self.uniqueTag += 1
consumer_tag = "tag" + str(self.uniqueTag)
self.channel.message_subscribe(queue=queueName, destination=consumer_tag)
+ self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+ self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
return self.client.queue(consumer_tag)
+
+ def subscribe(self, channel=None, **keys):
+ channel = channel or self.channel
+ consumer_tag = keys["destination"]
+ channel.message_subscribe(**keys)
+ channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+ channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
def assertEmpty(self, queue):
"""Assert that the queue is empty"""
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py Wed Sep 12 07:49:12 2007
@@ -39,13 +39,13 @@
#declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages
channel.queue_declare(queue="returns", exclusive=True)
channel.queue_bind(queue="returns", exchange="secondary")
- channel.message_subscribe(destination="a", queue="returns")
+ self.subscribe(destination="a", queue="returns")
returned = self.client.queue("a")
#declare, bind (to the primary exchange) and consume from a queue for 'processed' messages
channel.queue_declare(queue="processed", exclusive=True)
channel.queue_bind(queue="processed", exchange="primary", routing_key="my-key")
- channel.message_subscribe(destination="b", queue="processed")
+ self.subscribe(destination="b", queue="processed")
processed = self.client.queue("b")
#publish to the primary exchange
@@ -73,7 +73,7 @@
channel.exchange_declare(exchange="dlq", type="fanout")
channel.queue_declare(queue="deleted", exclusive=True)
channel.queue_bind(exchange="dlq", queue="deleted")
- channel.message_subscribe(destination="dlq", queue="deleted")
+ self.subscribe(destination="dlq", queue="deleted")
dlq = self.client.queue("dlq")
#create a queue using the dlq as its alternate exchange:
@@ -103,7 +103,7 @@
channel.exchange_declare(exchange="dlq", type="fanout")
channel.queue_declare(queue="immediate", exclusive=True)
channel.queue_bind(exchange="dlq", queue="immediate")
- channel.message_subscribe(destination="dlq", queue="immediate")
+ self.subscribe(destination="dlq", queue="immediate")
dlq = self.client.queue("dlq")
#create a queue using the dlq as its alternate exchange:
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/broker.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/broker.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/broker.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/broker.py Wed Sep 12 07:49:12 2007
@@ -35,7 +35,7 @@
# No ack consumer
ctag = "tag1"
- ch.message_subscribe(queue = "myqueue", destination = ctag, confirm_mode = 0)
+ self.subscribe(ch, queue = "myqueue", destination = ctag)
body = "test no-ack"
ch.message_transfer(content = Content(body, properties = {"routing_key" : "myqueue"}))
msg = self.client.queue(ctag).get(timeout = 5)
@@ -44,7 +44,9 @@
# Acknowledging consumer
self.queue_declare(ch, queue = "otherqueue")
ctag = "tag2"
- ch.message_subscribe(queue = "otherqueue", destination = ctag, confirm_mode = 1)
+ self.subscribe(ch, queue = "otherqueue", destination = ctag, confirm_mode = 1)
+ ch.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF)
+ ch.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF)
body = "test ack"
ch.message_transfer(content = Content(body, properties = {"routing_key" : "otherqueue"}))
msg = self.client.queue(ctag).get(timeout = 5)
@@ -60,7 +62,7 @@
self.queue_declare(channel, queue="test-queue")
channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
consumer_tag = "tag1"
- channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0)
+ self.subscribe(queue="test-queue", destination=consumer_tag)
queue = self.client.queue(consumer_tag)
body = "Immediate Delivery"
@@ -84,7 +86,7 @@
channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"}))
consumer_tag = "tag1"
- channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0)
+ self.subscribe(queue="test-queue", destination=consumer_tag)
queue = self.client.queue(consumer_tag)
msg = queue.get(timeout=5)
self.assert_(msg.content.body == body)
@@ -111,7 +113,7 @@
def test_channel_flow(self):
channel = self.channel
channel.queue_declare(queue="flow_test_queue", exclusive=True)
- channel.message_subscribe(destination="my-tag", queue="flow_test_queue")
+ self.subscribe(destination="my-tag", queue="flow_test_queue")
incoming = self.client.queue("my-tag")
channel.channel_flow(active=False)
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py Wed Sep 12 07:49:12 2007
@@ -366,7 +366,7 @@
#check the second message is available, but not the first
self.assertMessageCount(1, "tx-queue")
- channel.message_subscribe(queue="tx-queue", destination="results", confirm_mode=1)
+ self.subscribe(channel, queue="tx-queue", destination="results", confirm_mode=1)
msg = self.client.queue("results").get(timeout=1)
self.assertEqual("two", msg.content['message_id'])
channel.message_cancel(destination="results")
@@ -602,5 +602,7 @@
def assertMessageId(self, expected, queue):
self.channel.message_subscribe(queue=queue, destination="results")
+ self.channel.message_flow(destination="results", unit=0, value=1)
+ self.channel.message_flow(destination="results", unit=1, value=0xFFFFFFFF)
self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id'])
self.channel.message_cancel(destination="results")
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/example.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/example.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/example.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/example.py Wed Sep 12 07:49:12 2007
@@ -69,6 +69,8 @@
# field that is filled if the reply includes content. In this case the
# interesting field is the consumer_tag.
channel.message_subscribe(queue="test-queue", destination="consumer_tag")
+ channel.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF)
+ channel.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF)
# We can use the Client.queue(...) method to access the queue
# corresponding to our consumer_tag.
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Wed Sep 12 07:49:12 2007
@@ -34,8 +34,8 @@
channel.queue_declare(queue="test-queue-1a", exclusive=True)
channel.queue_declare(queue="test-queue-1b", exclusive=True)
#establish two consumers one of which excludes delivery of locally sent messages
- channel.message_subscribe(destination="local_included", queue="test-queue-1a")
- channel.message_subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
+ self.subscribe(destination="local_included", queue="test-queue-1a")
+ self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
#send a message
channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local"))
@@ -61,9 +61,9 @@
channel.queue_declare(queue="test-queue-2", exclusive=True)
#check that an exclusive consumer prevents other consumer being created:
- channel.message_subscribe(destination="first", queue="test-queue-2", exclusive=True)
+ self.subscribe(destination="first", queue="test-queue-2", exclusive=True)
try:
- channel.message_subscribe(destination="second", queue="test-queue-2")
+ self.subscribe(destination="second", queue="test-queue-2")
self.fail("Expected consume request to fail due to previous exclusive consumer")
except Closed, e:
self.assertChannelException(403, e.args[0])
@@ -73,9 +73,9 @@
channel.channel_open()
#check that an exclusive consumer cannot be created if a consumer already exists:
- channel.message_subscribe(destination="first", queue="test-queue-2")
+ self.subscribe(channel, destination="first", queue="test-queue-2")
try:
- channel.message_subscribe(destination="second", queue="test-queue-2", exclusive=True)
+ self.subscribe(destination="second", queue="test-queue-2", exclusive=True)
self.fail("Expected exclusive consume request to fail due to previous consumer")
except Closed, e:
self.assertChannelException(403, e.args[0])
@@ -87,7 +87,7 @@
channel = self.channel
try:
#queue specified but doesn't exist:
- channel.message_subscribe(queue="invalid-queue")
+ self.subscribe(queue="invalid-queue", destination="")
self.fail("Expected failure when consuming from non-existent queue")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -96,7 +96,7 @@
channel.channel_open()
try:
#queue not specified and none previously declared for channel:
- channel.message_subscribe(queue="")
+ self.subscribe(channel, queue="", destination="")
self.fail("Expected failure when consuming from unspecified queue")
except Closed, e:
self.assertConnectionException(530, e.args[0])
@@ -110,9 +110,9 @@
channel.queue_declare(queue="test-queue-3", exclusive=True)
#check that attempts to use duplicate tags are detected and prevented:
- channel.message_subscribe(destination="first", queue="test-queue-3")
+ self.subscribe(destination="first", queue="test-queue-3")
try:
- channel.message_subscribe(destination="first", queue="test-queue-3")
+ self.subscribe(destination="first", queue="test-queue-3")
self.fail("Expected consume request to fail due to non-unique tag")
except Closed, e:
self.assertConnectionException(530, e.args[0])
@@ -124,7 +124,7 @@
channel = self.channel
#setup, declare a queue:
channel.queue_declare(queue="test-queue-4", exclusive=True)
- channel.message_subscribe(destination="my-consumer", queue="test-queue-4")
+ self.subscribe(destination="my-consumer", queue="test-queue-4")
channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One"))
#cancel should stop messages being delivered
@@ -150,7 +150,7 @@
channel = self.channel
channel.queue_declare(queue="test-ack-queue", exclusive=True)
- channel.message_subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
+ self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One"))
@@ -194,7 +194,7 @@
channel = self.channel
channel.queue_declare(queue="test-requeue", exclusive=True)
- channel.message_subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
+ self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One"))
@@ -225,7 +225,7 @@
#requeue unacked messages (Three and Five)
channel.message_recover(requeue=True)
- channel.message_subscribe(queue="test-requeue", destination="consumer_tag")
+ self.subscribe(queue="test-requeue", destination="consumer_tag")
queue2 = self.client.queue("consumer_tag")
msg3b = queue2.get(timeout=1)
@@ -256,7 +256,7 @@
#setup: declare queue and subscribe
channel = self.channel
channel.queue_declare(queue="test-prefetch-count", exclusive=True)
- subscription = channel.message_subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1)
+ subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
#set prefetch to 5:
@@ -298,7 +298,7 @@
#setup: declare queue and subscribe
channel = self.channel
channel.queue_declare(queue="test-prefetch-size", exclusive=True)
- subscription = channel.message_subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1)
+ subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
#set prefetch to 50 bytes (each message is 9 or 10 bytes):
@@ -345,13 +345,13 @@
channel.queue_declare(queue = "r", exclusive=True)
channel.queue_bind(queue = "r", exchange = "amq.fanout")
- channel.message_subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
+ self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah"))
msg = self.client.queue("consumer").get(timeout = 1)
self.assertEquals(msg.content.body, "blah, blah")
channel.message_reject([msg.command_id, msg.command_id])
- channel.message_subscribe(queue = "r", destination = "checker")
+ self.subscribe(queue = "r", destination = "checker")
msg = self.client.queue("checker").get(timeout = 1)
self.assertEquals(msg.content.body, "blah, blah")
@@ -365,8 +365,6 @@
#create consumer (for now that defaults to infinite credit)
channel.message_subscribe(queue = "q", destination = "c")
channel.message_flow_mode(mode = 0, destination = "c")
- #set credit to zero (can remove this once move to proper default for subscribe method)
- channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
@@ -397,8 +395,6 @@
#create consumer (for now that defaults to infinite credit)
channel.message_subscribe(queue = "q", destination = "c")
channel.message_flow_mode(mode = 0, destination = "c")
- #set credit to zero (can remove this once move to proper default for subscribe method)
- channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
@@ -431,8 +427,6 @@
#create consumer (for now that defaults to infinite credit)
channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
channel.message_flow_mode(mode = 1, destination = "c")
- #set credit to zero (can remove this once move to proper default for subscribe method)
- channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
@@ -465,8 +459,6 @@
#create consumer (for now that defaults to infinite credit)
channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
channel.message_flow_mode(mode = 1, destination = "c")
- #set credit to zero (can remove this once move to proper default for subscribe method)
- channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
@@ -506,8 +498,8 @@
for i in range(1, 6):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
- channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
- channel.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
+ self.subscribe(queue = "q", destination = "a", acquire_mode = 1)
+ self.subscribe(queue = "q", destination = "b", acquire_mode = 1)
for i in range(6, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
@@ -532,7 +524,7 @@
channel.queue_declare(queue = "q", exclusive=True)
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
- channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
+ self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
msg = self.client.queue("a").get(timeout = 1)
channel.message_acquire([msg.command_id, msg.command_id])
msg.complete()
@@ -548,7 +540,7 @@
channel.queue_declare(queue = "q", exclusive=True)
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me"))
- channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
+ self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
msg = self.client.queue("a").get(timeout = 1)
channel.message_cancel(destination = "a")
channel.message_release([msg.command_id, msg.command_id])
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/queue.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/queue.py Wed Sep 12 07:49:12 2007
@@ -49,7 +49,7 @@
#send a further message and consume it, ensuring that the other messages are really gone
channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"}))
- channel.message_subscribe(queue="test-queue", destination="tag")
+ self.subscribe(queue="test-queue", destination="tag")
queue = self.client.queue("tag")
msg = queue.get(timeout=1)
self.assertEqual("four", msg.content.body)
@@ -169,8 +169,8 @@
channel.queue_declare(queue="queue-1", exclusive="True")
channel.queue_declare(queue="queue-2", exclusive="True")
- channel.message_subscribe(queue="queue-1", destination="queue-1")
- channel.message_subscribe(queue="queue-2", destination="queue-2")
+ self.subscribe(queue="queue-1", destination="queue-1")
+ self.subscribe(queue="queue-2", destination="queue-2")
queue1 = self.client.queue("queue-1")
queue2 = self.client.queue("queue-2")
@@ -257,7 +257,7 @@
channel.channel_open()
#empty queue:
- channel.message_subscribe(destination="consumer_tag", queue="delete-me-2")
+ self.subscribe(channel, destination="consumer_tag", queue="delete-me-2")
queue = self.client.queue("consumer_tag")
msg = queue.get(timeout=1)
self.assertEqual("message", msg.content.body)
@@ -282,7 +282,7 @@
#create a queue and register a consumer:
channel.queue_declare(queue="delete-me-3")
channel.queue_declare(queue="delete-me-3", passive="True")
- channel.message_subscribe(destination="consumer_tag", queue="delete-me-3")
+ self.subscribe(destination="consumer_tag", queue="delete-me-3")
#need new channel now:
channel2 = self.client.channel(2)
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/tx.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/tx.py?rev=574979&r1=574978&r2=574979&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/tx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/tx.py Wed Sep 12 07:49:12 2007
@@ -41,13 +41,13 @@
channel = self.channel
channel.tx_select()
- channel.message_subscribe(queue="tx-commit-a", destination="qa", confirm_mode=1)
+ self.subscribe(channel, queue="tx-commit-a", destination="qa", confirm_mode=1)
queue_a = self.client.queue("qa")
- channel.message_subscribe(queue="tx-commit-b", destination="qb", confirm_mode=1)
+ self.subscribe(channel, queue="tx-commit-b", destination="qb", confirm_mode=1)
queue_b = self.client.queue("qb")
- channel.message_subscribe(queue="tx-commit-c", destination="qc", confirm_mode=1)
+ self.subscribe(channel, queue="tx-commit-c", destination="qc", confirm_mode=1)
queue_c = self.client.queue("qc")
#check results
@@ -176,7 +176,7 @@
channel.tx_select()
#consume and ack messages
- channel.message_subscribe(queue=name_a, destination="sub_a", confirm_mode=1)
+ self.subscribe(channel, queue=name_a, destination="sub_a", confirm_mode=1)
queue_a = self.client.queue("sub_a")
for i in range(1, 5):
msg = queue_a.get(timeout=1)
@@ -184,13 +184,13 @@
msg.complete()
- channel.message_subscribe(queue=name_b, destination="sub_b", confirm_mode=1)
+ self.subscribe(channel, queue=name_b, destination="sub_b", confirm_mode=1)
queue_b = self.client.queue("sub_b")
msg = queue_b.get(timeout=1)
self.assertEqual("Message 6", msg.content.body)
msg.complete()
- sub_c = channel.message_subscribe(queue=name_c, destination="sub_c", confirm_mode=1)
+ sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", confirm_mode=1)
queue_c = self.client.queue("sub_c")
msg = queue_c.get(timeout=1)
self.assertEqual("Message 7", msg.content.body)