You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/11/07 17:06:32 UTC
svn commit: r592803 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/
qpid/client/ qpid/sys/ tests/
Author: aconway
Date: Wed Nov 7 08:06:31 2007
New Revision: 592803
URL: http://svn.apache.org/viewvc?rev=592803&view=rev
Log:
Added LocalQueue subscriptions. LocalQueue::pop() provides a "pull"
alternative to the MessageListener::received() "push" API.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp Wed Nov 7 08:06:31 2007
@@ -46,4 +46,6 @@
const char* Exception::what() const throw() { return str().c_str(); }
+const std::string ClosedException::CLOSED_MESSAGE("Closed");
+
} // namespace qpid
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h Wed Nov 7 08:06:31 2007
@@ -61,6 +61,11 @@
: Exception(message), code(code_) {}
};
-} // namespace qpid
+struct ClosedException : public Exception {
+ static const std::string CLOSED_MESSAGE;
+ ClosedException(const std::string& msg=CLOSED_MESSAGE) : Exception(msg) {}
+};
+} // namespace qpid
+
#endif /*!_Exception_*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp Wed Nov 7 08:06:31 2007
@@ -179,7 +179,7 @@
bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
string tag = "get-handler";
ScopedDivert handler(tag, session.execution().getDemux());
- Demux::Queue& incoming = handler.getQueue();
+ Demux::QueuePtr incoming = handler.getQueue();
session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1));
session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
@@ -189,7 +189,7 @@
session.messageCancel(tag);
FrameSet::shared_ptr p;
- if (incoming.tryPop(p)) {
+ if (incoming->tryPop(p)) {
msg.populate(*p);
if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true);
return true;
@@ -265,7 +265,7 @@
QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod());
}
}
- } catch (const sys::QueueClosed&) {}
+ } catch (const ClosedException&) {}
}
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp Wed Nov 7 08:06:31 2007
@@ -37,7 +37,7 @@
ScopedDivert::ScopedDivert(const std::string& _dest, Demux& _demuxer) : dest(_dest), demuxer(_demuxer)
{
- queue = &(demuxer.add(dest, ByTransferDest(dest)));
+ queue = demuxer.add(dest, ByTransferDest(dest));
}
ScopedDivert::~ScopedDivert()
@@ -45,9 +45,9 @@
demuxer.remove(dest);
}
-Demux::Queue& ScopedDivert::getQueue()
+Demux::QueuePtr ScopedDivert::getQueue()
{
- return *queue;
+ return queue;
}
void Demux::handle(framing::FrameSet::shared_ptr frameset)
@@ -61,7 +61,7 @@
}
}
if (!matched) {
- defaultQueue.push(frameset);
+ defaultQueue->push(frameset);
}
}
@@ -71,17 +71,17 @@
for (iterator i = records.begin(); i != records.end(); i++) {
i->queue->close();
}
- defaultQueue.close();
+ defaultQueue->close();
}
-Demux::Queue& Demux::add(const std::string& name, Condition condition)
+Demux::QueuePtr Demux::add(const std::string& name, Condition condition)
{
sys::Mutex::ScopedLock l(lock);
iterator i = std::find_if(records.begin(), records.end(), Find(name));
if (i == records.end()) {
Record r(name, condition);
records.push_back(r);
- return *(r.queue);
+ return r.queue;
} else {
throw Exception("Queue already exists for " + name);
}
@@ -93,17 +93,17 @@
records.remove_if(Find(name));
}
-Demux::Queue& Demux::get(const std::string& name)
+Demux::QueuePtr Demux::get(const std::string& name)
{
sys::Mutex::ScopedLock l(lock);
iterator i = std::find_if(records.begin(), records.end(), Find(name));
if (i == records.end()) {
throw Exception("No queue for " + name);
}
- return *(i->queue);
+ return i->queue;
}
-Demux::Queue& Demux::getDefault()
+Demux::QueuePtr Demux::getDefault()
{
return defaultQueue;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h Wed Nov 7 08:06:31 2007
@@ -45,16 +45,19 @@
public:
typedef boost::function<bool(const framing::FrameSet&)> Condition;
typedef sys::BlockingQueue<framing::FrameSet::shared_ptr> Queue;
+ typedef boost::shared_ptr<Queue> QueuePtr;
+ Demux() : defaultQueue(new Queue()) {}
+
void handle(framing::FrameSet::shared_ptr);
void close();
- Queue& add(const std::string& name, Condition);
+ QueuePtr add(const std::string& name, Condition);
void remove(const std::string& name);
- Queue& get(const std::string& name);
- Queue& getDefault();
+ QueuePtr get(const std::string& name);
+ QueuePtr getDefault();
+
private:
- typedef boost::shared_ptr<Queue> QueuePtr;
struct Record
{
const std::string name;
@@ -66,7 +69,7 @@
sys::Mutex lock;
std::list<Record> records;
- Queue defaultQueue;
+ QueuePtr defaultQueue;
typedef std::list<Record>::iterator iterator;
@@ -82,15 +85,14 @@
{
const std::string dest;
Demux& demuxer;
- Demux::Queue* queue;
+ Demux::QueuePtr queue;
public:
ScopedDivert(const std::string& dest, Demux& demuxer);
~ScopedDivert();
- Demux::Queue& getQueue();
+ Demux::QueuePtr getQueue();
};
-}
-}
+}} // namespace qpid::client
#endif
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Wed Nov 7 08:06:31 2007
@@ -62,14 +62,14 @@
void Dispatcher::run()
{
- sys::BlockingQueue<FrameSet::shared_ptr>& q = queue.empty() ?
+ Demux::QueuePtr q = queue.empty() ?
session.execution().getDemux().getDefault() :
session.execution().getDemux().get(queue);
startRunning();
stopped = false;
while (!isStopped()) {
- FrameSet::shared_ptr content = q.pop();
+ FrameSet::shared_ptr content = q->pop();
if (content->isA<MessageTransferBody>()) {
Message msg(*content);
Subscriber::shared_ptr listener = find(msg.getDestination());
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h?rev=592803&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h Wed Nov 7 08:06:31 2007
@@ -0,0 +1,52 @@
+#ifndef QPID_CLIENT_LOCALQUEUE_H
+#define QPID_CLIENT_LOCALQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Message.h"
+#include "qpid/Exception.h"
+#include "qpid/sys/BlockingQueue.h"
+
+namespace qpid {
+namespace client {
+
+/**
+ * Local representation of a remote queue.
+ */
+class LocalQueue
+{
+ public:
+ LocalQueue(BlockingQueue& q) : queue(q) {}
+ ~LocalQueue();
+
+ /** Pop the next message off the queue.
+ *@exception ClosedException if subscription has been closed.
+ */
+ Message pop() { reurn queue->pop(); }
+
+ private:
+ BlockingQueue& queue;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_LOCALQUEUE_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Wed Nov 7 08:06:31 2007
@@ -162,7 +162,7 @@
FrameSet::shared_ptr SessionCore::get() { // user thread
// No lock here: pop does a blocking wait.
- return l3.getDemux().getDefault().pop();
+ return l3.getDemux().getDefault()->pop();
}
void SessionCore::open(uint32_t detachedLifetime) { // user thread
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h Wed Nov 7 08:06:31 2007
@@ -29,8 +29,6 @@
namespace qpid {
namespace sys {
-struct QueueClosed {};
-
template <class T>
class BlockingQueue
{
@@ -46,7 +44,7 @@
T pop()
{
Waitable::ScopedLock l(lock);
- if (!queueWait()) throw QueueClosed();
+ if (!queueWait()) throw ClosedException();
return popInternal();
}
@@ -78,7 +76,7 @@
}
/**
- * Close the queue. Throws QueueClosed in threads waiting in pop().
+ * Close the queue. Throws ClosedException in threads waiting in pop().
* Blocks till all waiting threads have been notified.
*/
void close()
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h Wed Nov 7 08:06:31 2007
@@ -83,7 +83,7 @@
QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f));
}
}
- catch (const sys::QueueClosed&) {
+ catch (const ClosedException&) {
return;
}
}