You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/07/01 20:01:11 UTC
svn commit: r673158 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/client/
qpid/sys/ tests/
Author: aconway
Date: Tue Jul 1 11:01:11 2008
New Revision: 673158
URL: http://svn.apache.org/viewvc?rev=673158&view=rev
Log:
Added timeout to SubscriptionManager::get(), LocalQueue::get() and BlockingQueue::get()
Removed:
incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h
incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp Tue Jul 1 11:01:11 2008
@@ -31,14 +31,25 @@
LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {}
LocalQueue::~LocalQueue() {}
-Message LocalQueue::pop() {
+Message LocalQueue::pop() { return get(); }
+
+Message LocalQueue::get() {
+ Message result;
+ bool ok = get(result, sys::TIME_INFINITE);
+ assert(ok); (void) ok;
+ return result;
+}
+
+bool LocalQueue::get(Message& result, sys::Duration timeout) {
if (!queue)
throw ClosedException();
- FrameSet::shared_ptr content = queue->pop();
+ FrameSet::shared_ptr content;
+ bool ok = queue->pop(content, timeout);
+ if (!ok) return false;
if (content->isA<MessageTransferBody>()) {
- Message m(*content);
- autoAck.ack(m, session);
- return m;
+ result = Message(*content);
+ autoAck.ack(result, session);
+ return true;
}
else
throw CommandInvalidException(
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h Tue Jul 1 11:01:11 2008
@@ -25,6 +25,7 @@
#include "qpid/client/Message.h"
#include "qpid/client/Demux.h"
#include "qpid/client/AckPolicy.h"
+#include "qpid/sys/Time.h"
namespace qpid {
namespace client {
@@ -42,7 +43,7 @@
public:
/** Create a local queue. Subscribe the local queue to a remote broker
* queue with a SubscriptionManager.
- *
+ *
* LocalQueue is an alternative to implementing a MessageListener.
*
*@param ackPolicy Policy for acknowledging messages. @see AckPolicy.
@@ -51,14 +52,22 @@
~LocalQueue();
- /** Pop the next message off the local queue.
+ /** Wait up to timeout for the next message from the local queue.
+ *@param result Set to the message from the queue.
+ *@param timeout wait up this timeout for a message to appear.
+ *@return true if result was set, false if queue was empty after timeout.
+ */
+ bool get(Message& result, sys::Duration timeout=0);
+
+ /** Get the next message off the local queue, or wait for a
+ * message from the broker queue.
*@exception ClosedException if subscription has been closed.
*/
+ Message get();
+
+ /** Synonym for get(). */
Message pop();
- /** Synonym for pop(). */
- Message get() { return pop(); }
-
/** Return true if local queue is empty. */
bool empty() const;
@@ -72,10 +81,11 @@
AckPolicy& getAckPolicy();
private:
- friend class SubscriptionManager;
Session session;
Demux::QueuePtr queue;
AckPolicy autoAck;
+
+ friend class SubscriptionManager;
};
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Tue Jul 1 11:01:11 2008
@@ -129,10 +129,13 @@
dispatcher.stop();
}
-Message SubscriptionManager::get(const std::string& queue) {
+bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) {
LocalQueue lq;
- subscribe(lq, queue, FlowControl::messageCredit(1), framing::Uuid(true).str());
- return lq.get();
+ std::string unique = framing::Uuid(true).str();
+ subscribe(lq, queue, FlowControl::messageCredit(1), unique);
+ AutoCancel ac(*this, unique);
+ sync(session).messageFlush(unique);
+ return lq.get(result, timeout);
}
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Tue Jul 1 11:01:11 2008
@@ -122,10 +122,14 @@
const std::string& queue,
const std::string& tag=std::string());
- /**
- * Get a single message from a queue.
+
+ /** Get a single message from a queue.
+ *@param result is set to the message from the queue.
+ *@
+ *@param timeout wait up this timeout for a message to appear.
+ *@return true if result was set, false if no message available after timeout.
*/
- Message get(const std::string& queue);
+ bool get(Message& result, const std::string& queue, sys::Duration timeout=0);
/** Cancel a subscription. */
void cancel(const std::string tag);
@@ -191,6 +195,15 @@
AckPolicy& getAckPolicy();
};
+/** AutoCancel cancels a subscription in its destructor */
+class AutoCancel {
+ public:
+ AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {}
+ ~AutoCancel() { sm.cancel(tag); }
+ private:
+ SubscriptionManager& sm;
+ std::string tag;
+};
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h Tue Jul 1 11:01:11 2008
@@ -35,47 +35,45 @@
template <class T>
class BlockingQueue
{
- mutable sys::Waitable lock;
+ mutable sys::Waitable waitable;
std::queue<T> queue;
- bool closed;
public:
- BlockingQueue() : closed(false) {}
+ BlockingQueue() {}
~BlockingQueue() { close(); }
- /** Block until there is a value to pop */
- T pop()
- {
- Waitable::ScopedLock l(lock);
- if (!queueWait()) throw ClosedException();
- return popInternal();
- }
-
- /** Non-blocking pop. If there is a value set outValue and return
- * true, else return false;
+ /** Pop from the queue, block up to timeout if empty.
+ *@param result Set to value popped from queue.
+ *@param timeout Defaults to infinite.
+ *@return true if result was set, false if queue empty after timeout.
*/
- bool tryPop(T& outValue) {
- Waitable::ScopedLock l(lock);
+ bool pop(T& result, Duration timeout=TIME_INFINITE) {
+ Mutex::ScopedLock l(waitable);
+ {
+ Waitable::ScopedWait w(waitable);
+ AbsTime deadline(now(),timeout);
+ while (queue.empty()) waitable.wait(deadline);
+ }
if (queue.empty()) return false;
- outValue = popInternal();
+ result = queue.front();
+ queue.pop();
+ if (!queue.empty())
+ waitable.notify(); // Notify another waiter.
return true;
}
- /** Non-blocking pop. If there is a value return it, else return
- * valueIfEmpty.
- */
- T tryPop(const T& valueIfEmpty=T()) {
- T result=valueIfEmpty;
- tryPop(result);
+ T pop() {
+ T result;
+ bool ok = pop(result);
+ assert(ok); (void) ok; // Infinite wait.
return result;
}
-
+
/** Push a value onto the queue */
- void push(const T& t)
- {
- Waitable::ScopedLock l(lock);
+ void push(const T& t) {
+ Mutex::ScopedLock l(waitable);
queue.push(t);
- queueNotify(0);
+ waitable.notify(); // Notify a waiter.
}
/**
@@ -84,56 +82,33 @@
*/
void close(const ExceptionHolder& ex=ExceptionHolder(new ClosedException()))
{
- Waitable::ScopedLock l(lock);
- if (!closed) {
- lock.setException(ex);
- closed = true;
- lock.notifyAll();
- lock.waitWaiters(); // Ensure no threads are still waiting.
+ Mutex::ScopedLock l(waitable);
+ if (!waitable.hasException()) {
+ waitable.setException(ex);
+ waitable.notifyAll();
+ waitable.waitWaiters(); // Ensure no threads are still waiting.
}
}
/** Open a closed queue. */
void open() {
- Waitable::ScopedLock l(lock);
- closed=false;
+ Mutex::ScopedLock l(waitable);
+ waitable.resetException();
}
bool isClosed() const {
- Waitable::ScopedLock l(lock);
- return closed;
+ Mutex::ScopedLock l(waitable);
+ return waitable.hasException();
}
bool empty() const {
- Waitable::ScopedLock l(lock);
+ Mutex::ScopedLock l(waitable);
return queue.empty();
}
size_t size() const {
- Waitable::ScopedLock l(lock);
+ Mutex::ScopedLock l(waitable);
return queue.size();
}
-
- private:
-
- void queueNotify(size_t ignore) {
- if (!queue.empty() && lock.hasWaiters()>ignore)
- lock.notify(); // Notify another waiter.
- }
-
- bool queueWait() {
- Waitable::ScopedWait w(lock);
- while (!closed && queue.empty())
- lock.wait();
- return !queue.empty();
- }
-
- T popInternal() {
- T t=queue.front();
- queue.pop();
- queueNotify(1);
- return t;
- }
-
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h Tue Jul 1 11:01:11 2008
@@ -76,6 +76,12 @@
}
+ /** True if the waitable has an exception */
+ bool hasException() const { return exception; }
+
+ /** Clear the exception if any */
+ void resetException() { exception.reset(); }
+
/** Throws an exception if one is set before or during the wait. */
void wait() {
ExCheck e(exception);
@@ -88,8 +94,6 @@
return Monitor::wait(absoluteTime);
}
- ExceptionHolder exception;
-
private:
struct ExCheck {
const ExceptionHolder& exception;
@@ -98,6 +102,8 @@
};
size_t waiters;
+ ExceptionHolder exception;
+
friend struct ScopedWait;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Tue Jul 1 11:01:11 2008
@@ -41,6 +41,7 @@
using namespace qpid::framing;
using namespace qpid;
using qpid::sys::Monitor;
+using qpid::sys::TIME_SEC;
using std::string;
using std::cout;
using std::endl;
@@ -242,8 +243,11 @@
fix.session.queueDeclare(queue="getq", exclusive=true, autoDelete=true);
fix.session.messageTransfer(content=Message("foo0", "getq"));
fix.session.messageTransfer(content=Message("foo1", "getq"));
- BOOST_CHECK_EQUAL("foo0", fix.subs.get("getq").getData());
- BOOST_CHECK_EQUAL("foo1", fix.subs.get("getq").getData());
+ Message got;
+ BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC));
+ BOOST_CHECK_EQUAL("foo0", got.getData());
+ BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC));
+ BOOST_CHECK_EQUAL("foo1", got.getData());
}
QPID_AUTO_TEST_CASE(testOpenFailure) {
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Jul 1 11:01:11 2008
@@ -164,22 +164,18 @@
push_back(new BrokerFixture(opts));
}
-#if 0 // FIXME aconway 2008-06-26: TODO
-
-
+#if 0
QPID_AUTO_TEST_CASE(testWiringReplication) {
- const size_t SIZE=3;
- ClusterFixture cluster(SIZE);
+ ClusterFixture cluster(3);
Client c0(cluster[0].getPort());
BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
c0.session.queueDeclare("q");
c0.session.exchangeDeclare("ex", arg::type="direct");
- BOOST_CHECK_EQUAL("q", c0.session.queueQuery("q").getQueue());
- BOOST_CHECK_EQUAL("direct", c0.session.exchangeQuery("ex").getType());
-
+ c0.session.close();
// Verify all brokers get wiring update.
- for (size_t i = 1; i < cluster.size(); ++i) {
+ for (size_t i = 0; i < cluster.size(); ++i) {
+ BOOST_MESSAGE("i == "<< i);
Client c(cluster[i].getPort());
BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue());
BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType());
@@ -188,24 +184,15 @@
QPID_AUTO_TEST_CASE(testMessageReplication) {
// Enqueue on one broker, dequeue on another.
- ClusterConnections cluster;
- BOOST_REQUIRE(cluster.size() > 1);
-
- Session broker0 = cluster[0]->newSession();
- broker0.queueDeclare(queue="q");
- broker0.messageTransfer(content=TransferContent("data", "q"));
- broker0.close();
-
- Session broker1 = cluster[1]->newSession();
- broker1.
- c.session.messageSubscribe(queue="q", destination="q");
- c.session.messageFlow(destination="q", unit=0, value=1);//messages
- FrameSet::shared_ptr msg = c.session.get();
- BOOST_CHECK(msg->isA<MessageTransferBody>());
- BOOST_CHECK_EQUAL(string("data"), msg->getContent());
- c.session.getExecution().completed(msg->getId(), true, true);
- cluster[i]->close();
- }
+ ClusterFixture cluster(2);
+ Client c0(cluster[0].getPort());
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=TransferContent("data", "q"));
+ c0.session.close();
+ Client c1(cluster[1].getPort());
+ Message msg;
+ BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL(string("data"), msg.getData());
}
// TODO aconway 2008-06-25: dequeue replication, exactly once delivery, failover.