You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/02/14 16:02:12 UTC
svn commit: r507560 - in /incubator/qpid/branches/qpid.0-9/cpp: lib/broker/
lib/client/ lib/common/ lib/common/sys/ tests/
Author: aconway
Date: Wed Feb 14 07:02:10 2007
New Revision: 507560
URL: http://svn.apache.org/viewvc?view=rev&rev=507560
Log:
* cpp/lib/common/sys/ProducerConsumer.h:
General-purpose producer-consumer synchronization. Anywhere we have
producer/consumer threads in qpid we should re-use this sync object
rather than re-inventing the synchronization each time.
* cpp/lib/common/sys/AtomicCount.h: Separated ScopedIncrement/ScopedDecrement
into ScopedIncrement.h
* cpp/tests/InProcessBroker.h: Added class InProcessBrokerClient, a
self contained in-process client + broker convenience for tests.
Added:
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.cpp (with props)
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.h (with props)
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ScopedIncrement.h (with props)
incubator/qpid/branches/qpid.0-9/cpp/tests/ProducerConsumerTest.cpp (with props)
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/AtomicCount.h
incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h
incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Wed Feb 14 07:02:10 2007
@@ -327,9 +327,9 @@
)
{
try{
- if(id != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
+ if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
std::stringstream out;
- out << "Attempt to use unopened channel: " << id;
+ out << "Attempt to use unopened channel: " << getId();
throw ConnectionException(504, out.str());
} else {
method->invoke(*adapter, context);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Wed Feb 14 07:02:10 2007
@@ -78,7 +78,6 @@
typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
Connection& connection;
- u_int16_t id;
u_int64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
bool transactional;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp Wed Feb 14 07:02:10 2007
@@ -54,11 +54,6 @@
return *proxy;
}
-AMQMethodBody::shared_ptr Channel::brokerResponse() {
- // FIXME aconway 2007-02-08: implement responses.
- return AMQMethodBody::shared_ptr();
-}
-
void Channel::open(ChannelId id, Connection& con)
{
if (isOpen())
@@ -482,7 +477,6 @@
u_int16_t code, const std::string& text,
ClassId classId, MethodId methodId)
{
- // FIXME aconway 2007-01-26: Locking?
if (getId() != 0 && isOpen()) {
try {
sendAndReceive<ChannelCloseOkBody>(
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h Wed Feb 14 07:02:10 2007
@@ -44,6 +44,7 @@
namespace framing {
class ChannelCloseBody;
class AMQP_ServerProxy;
+class AMQMethodBody;
}
namespace client {
@@ -89,10 +90,13 @@
u_int64_t lastDeliveryTag;
};
typedef std::map<std::string, Consumer> ConsumerMap;
+ typedef std::queue<boost::shared_ptr<framing::AMQMethodBody> > IncomingMethods;
+
static const std::string OK;
Connection* connection;
sys::Thread dispatcher;
+ IncomingMethods incomingMethods;
IncomingMessage* incoming;
ResponseHandler responses;
std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
@@ -367,12 +371,12 @@
* Returns a proxy for the "raw" AMQP broker protocol. Only for use by
* protocol experts.
*/
-
framing::AMQP_ServerProxy& brokerProxy();
+
/**
* Wait for the next method from the broker.
*/
- framing::AMQMethodBody::shared_ptr brokerResponse();
+ framing::AMQMethodBody::shared_ptr receive();
};
}}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am Wed Feb 14 07:02:10 2007
@@ -85,7 +85,8 @@
ExceptionHolder.cpp \
QpidError.cpp \
sys/Runnable.cpp \
- sys/Time.cpp
+ sys/Time.cpp \
+ sys/ProducerConsumer.cpp
nobase_pkginclude_HEADERS = \
$(gen)/AMQP_HighestVersion.h \
@@ -132,7 +133,8 @@
sys/Socket.h \
sys/Thread.h \
sys/Time.h \
- sys/TimeoutHandler.h
+ sys/TimeoutHandler.h \
+ sys/ProducerConsumer.h
# Force build during dist phase so help2man will work.
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/AtomicCount.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/AtomicCount.h?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/AtomicCount.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/AtomicCount.h Wed Feb 14 07:02:10 2007
@@ -20,7 +20,7 @@
*/
#include <boost/detail/atomic_count.hpp>
-#include <boost/noncopyable.hpp>
+#include "ScopedIncrement.h"
namespace qpid {
namespace sys {
@@ -30,26 +30,8 @@
*/
class AtomicCount : boost::noncopyable {
public:
- class ScopedDecrement : boost::noncopyable {
- public:
- /** Decrement counter in constructor and increment in destructor. */
- ScopedDecrement(AtomicCount& c) : count(c) { value = --count; }
- ~ScopedDecrement() { ++count; }
- /** Return the value returned by the decrement. */
- operator long() { return value; }
- private:
- AtomicCount& count;
- long value;
- };
-
- class ScopedIncrement : boost::noncopyable {
- public:
- /** Increment counter in constructor and increment in destructor. */
- ScopedIncrement(AtomicCount& c) : count(c) { ++count; }
- ~ScopedIncrement() { --count; }
- private:
- AtomicCount& count;
- };
+ typedef ScopedDecrement<AtomicCount> ScopedDecrement;
+ typedef ScopedIncrement<AtomicCount> ScopedIncrement;
AtomicCount(long value = 0) : count(value) {}
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.cpp?view=auto&rev=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.cpp Wed Feb 14 07:02:10 2007
@@ -0,0 +1,141 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#include "QpidError.h"
+#include "ScopedIncrement.h"
+#include "ProducerConsumer.h"
+
+namespace qpid {
+namespace sys {
+
+// // ================ ProducerConsumer
+
+ProducerConsumer::ProducerConsumer(size_t init_items)
+ : items(init_items), waiters(0), stopped(false)
+{}
+
+void ProducerConsumer::stop() {
+ Mutex::ScopedLock l(monitor);
+ stopped = true;
+ monitor.notifyAll();
+ // Wait for waiting consumers to wake up.
+ while (waiters > 0)
+ monitor.wait();
+}
+
+size_t ProducerConsumer::available() const {
+ Mutex::ScopedLock l(monitor);
+ return items;
+}
+
+size_t ProducerConsumer::consumers() const {
+ Mutex::ScopedLock l(monitor);
+ return waiters;
+}
+
+// ================ Lock
+
+ProducerConsumer::Lock::Lock(ProducerConsumer& p)
+ : pc(p), lock(p.monitor), status(INCOMPLETE) {}
+
+bool ProducerConsumer::Lock::isOk() const {
+ return !pc.isStopped() && status==INCOMPLETE;
+}
+
+void ProducerConsumer::Lock::checkOk() const {
+ assert(!pc.isStopped());
+ assert(status == INCOMPLETE);
+}
+
+ProducerConsumer::Lock::~Lock() {
+ assert(status != INCOMPLETE || pc.isStopped());
+}
+
+void ProducerConsumer::Lock::confirm() {
+ checkOk();
+ status = CONFIRMED;
+}
+
+void ProducerConsumer::Lock::cancel() {
+ checkOk();
+ status = CANCELLED;
+}
+
+// ================ ProducerLock
+
+ProducerConsumer::ProducerLock::ProducerLock(ProducerConsumer& p) : Lock(p)
+{}
+
+
+ProducerConsumer::ProducerLock::~ProducerLock() {
+ if (status == CONFIRMED) {
+ pc.items++;
+ pc.monitor.notify(); // Notify a consumer.
+ }
+}
+
+// ================ ConsumerLock
+
+ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p)
+{
+ if (isOk()) {
+ ScopedIncrement<size_t> inc(pc.waiters);
+ while (pc.items == 0 && !pc.stopped) {
+ pc.monitor.wait();
+ }
+ }
+}
+
+ProducerConsumer::ConsumerLock::ConsumerLock(
+ ProducerConsumer& p, const Time& timeout) : Lock(p)
+{
+ if (isOk()) {
+ // Don't wait if timeout==0
+ if (timeout == 0) {
+ if (pc.items == 0)
+ status = TIMEOUT;
+ return;
+ }
+ else {
+ Time deadline = now() + timeout;
+ ScopedIncrement<size_t> inc(pc.waiters);
+ while (pc.items == 0 && !pc.stopped) {
+ if (!pc.monitor.wait(deadline)) {
+ status = TIMEOUT;
+ return;
+ }
+ }
+ }
+ }
+}
+
+ProducerConsumer::ConsumerLock::~ConsumerLock() {
+ if (pc.isStopped()) {
+ if (pc.waiters == 0)
+ pc.monitor.notifyAll(); // All waiters woken, notify stop thread(s)
+ }
+ else if (status==CONFIRMED) {
+ pc.items--;
+ if (pc.items > 0)
+ pc.monitor.notify(); // Notify another consumer.
+ }
+}
+
+
+}} // namespace qpid::sys
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.h?view=auto&rev=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.h Wed Feb 14 07:02:10 2007
@@ -0,0 +1,165 @@
+#ifndef _sys_ProducerConsumer_h
+#define _sys_ProducerConsumer_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include "Exception.h"
+#include "sys/Monitor.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Producer-consumer synchronisation.
+ *
+ * Producers increase the number of available items, consumers reduce it.
+ * Consumers wait till an item is available. Waiting threads can be
+ * woken for shutdown using stop().
+ *
+ * Note: Currently implements unbounded producer-consumer, i.e. no limit
+ * to available items, producers never block. Can be extended to support
+ * bounded PC if required.
+ *
+ // TODO aconway 2007-02-13: example, from tests.
+*/
+class ProducerConsumer
+{
+ public:
+ ProducerConsumer(size_t init_items=0);
+
+ ~ProducerConsumer() { stop(); }
+
+ /**
+ * Wake any threads waiting for ProducerLock or ConsumerLock.
+ *@post No threads are waiting in Producer or Consumer locks.
+ */
+ void stop();
+
+ /** True if queue is stopped */
+ bool isStopped() { return stopped; }
+
+ /** Number of items available for consumers */
+ size_t available() const;
+
+ /** Number of consumers waiting for items */
+ size_t consumers() const;
+
+ /** True if available == 0 */
+ bool empty() const { return available() == 0; }
+
+ /**
+ * Base class for producer and consumer locks.
+ */
+ class Lock : private boost::noncopyable {
+ public:
+
+ /**
+ * You must call isOk() after creating a lock to verify its state.
+ *
+ *@return true means the lock succeeded. You MUST call either
+ *confirm() or cancel() before the lock goes out of scope.
+ *
+ * false means the lock failed - timed out or the
+ * ProducerConsumer is stopped. You should not do anything in
+ * the scope of the lock.
+ */
+ bool isOk() const;
+
+ /**
+ * Confirm that an item was produced/consumed.
+ *@pre isOk()
+ */
+ void confirm();
+
+ /**
+ * Cancel the lock to indicate nothing was produced/consumed.
+ * Note that locks are not actually released until destroyed.
+ *
+ *@pre isOk()
+ */
+ void cancel();
+
+ /** True if this lock experienced a timeout */
+ bool isTimedOut() const { return status == TIMEOUT; }
+
+ /** True if we have been stopped */
+ bool isStopped() const { return pc.isStopped(); }
+
+ ProducerConsumer& pc;
+
+ protected:
+ /** Lock status */
+ enum Status { INCOMPLETE, CONFIRMED, CANCELLED, TIMEOUT };
+
+ Lock(ProducerConsumer& p);
+ ~Lock();
+ void checkOk() const;
+ Mutex::ScopedLock lock;
+ Status status;
+ };
+
+ /** Lock for code that produces items. */
+ struct ProducerLock : public Lock {
+ /**
+ * Acquire locks to produce an item.
+ *@post If isOk() the calling thread has exclusive access
+ * to produce an item.
+ */
+ ProducerLock(ProducerConsumer& p);
+
+ /** Release locks, signal waiting consumers if confirm() was called. */
+ ~ProducerLock();
+ };
+
+ /** Lock for code that consumes items */
+ struct ConsumerLock : public Lock {
+ /**
+ * Wait for an item to consume and acquire locks.
+ *
+ *@post If isOk() there is at least one item available and the
+ *calling thread has exclusive access to consume it.
+ */
+ ConsumerLock(ProducerConsumer& p);
+
+ /**
+ * Wait up to timeout to acquire lock.
+ *@post If isOk() caller has a producer lock.
+ * If isTimedOut() there was a timeout.
+ * If neither then we were stopped.
+ */
+ ConsumerLock(ProducerConsumer& p, const Time& timeout);
+
+ /** Release locks */
+ ~ConsumerLock();
+ };
+
+ private:
+ mutable Monitor monitor;
+ size_t items;
+ size_t waiters;
+ bool stopped;
+
+ friend class Lock;
+ friend class ProducerLock;
+ friend class ConsumerLock;
+};
+
+}} // namespace qpid::sys
+
+#endif /*!_sys_ProducerConsumer_h*/
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ProducerConsumer.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ScopedIncrement.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ScopedIncrement.h?view=auto&rev=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ScopedIncrement.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ScopedIncrement.h Wed Feb 14 07:02:10 2007
@@ -0,0 +1,59 @@
+#ifndef _posix_ScopedIncrement_h
+#define _posix_ScopedIncrement_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+/** Increment counter in constructor and decrement in destructor. */
+template <class T>
+class ScopedIncrement : boost::noncopyable
+{
+ public:
+ ScopedIncrement(T& c) : count(c) { ++count; }
+ ~ScopedIncrement() { --count; }
+ private:
+ T& count;
+};
+
+
+/** Decrement counter in constructor and increment in destructor. */
+template <class T>
+class ScopedDecrement : boost::noncopyable
+{
+ public:
+ ScopedDecrement(T& c) : count(c) { value = --count; }
+ ~ScopedDecrement() { ++count; }
+
+ /** Return the value after the decrement. */
+ operator long() { return value; }
+
+ private:
+ T& count;
+ long value;
+};
+
+
+}}
+
+
+#endif // _posix_ScopedIncrement_h
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ScopedIncrement.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ScopedIncrement.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h Wed Feb 14 07:02:10 2007
@@ -26,6 +26,7 @@
#include "broker/Broker.h"
#include "broker/Connection.h"
#include "client/Connector.h"
+#include "client/Connection.h"
namespace qpid {
namespace broker {
@@ -54,6 +55,7 @@
class InProcessBroker : public client::Connector {
public:
enum Sender {CLIENT,BROKER};
+
struct Frame : public framing::AMQFrame {
Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {}
bool fromBroker() const { return from == BROKER; }
@@ -68,7 +70,7 @@
};
typedef std::vector<Frame> Conversation;
- InProcessBroker(const framing::ProtocolVersion& ver) :
+ InProcessBroker(framing::ProtocolVersion ver) :
Connector(ver),
protocolInit(ver),
broker(broker::Broker::create()),
@@ -77,6 +79,8 @@
clientOut(CLIENT, conversation, &brokerConnection)
{}
+ ~InProcessBroker() { broker->shutdown(); }
+
void connect(const std::string& /*host*/, int /*port*/) {}
void init() { brokerConnection.initiated(&protocolInit); }
void close() {}
@@ -141,157 +145,22 @@
}} // namespace qpid::broker
-#endif /*!_tests_InProcessBroker_h*/
-#ifndef _tests_InProcessBroker_h
-#define _tests_InProcessBroker_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <vector>
-#include <iostream>
-#include <algorithm>
-
-#include "framing/AMQFrame.h"
-#include "broker/Broker.h"
-#include "broker/Connection.h"
-#include "client/Connector.h"
-
-namespace qpid {
-namespace broker {
-
-/** Make a copy of a frame body. Inefficient, only intended for tests. */
-// TODO aconway 2007-01-29: from should be const, need to fix
-// AMQPFrame::encode as const.
-framing::AMQFrame copy(framing::AMQFrame& from) {
- framing::Buffer buffer(from.size());
- from.encode(buffer);
- buffer.flip();
- framing::AMQFrame result;
- result.decode(buffer);
- return result;
-}
-
-/**
- * A broker that implements client::Connector allowing direct
- * in-process connection of client to broker. Used to write round-trip
- * tests without requiring an external broker process.
- *
- * Also allows you to "snoop" on frames exchanged between client & broker.
- *
- * Use as follows:
- *
- \code
- broker::InProcessBroker ibroker(version);
- client::Connection clientConnection;
- clientConnection.setConnector(ibroker);
- clientConnection.open("");
- ... use as normal
- \endcode
- *
- */
-class InProcessBroker : public client::Connector {
+/** An in-process client+broker all in one. */
+class InProcessBrokerClient : public qpid::client::Connection {
public:
- enum Sender {CLIENT,BROKER};
- struct Frame : public framing::AMQFrame {
- Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {}
- bool fromBroker() const { return from == BROKER; }
- bool fromClient() const { return from == CLIENT; }
-
- template <class MethodType>
- MethodType* asMethod() {
- return dynamic_cast<MethodType*>(getBody().get());
- }
-
- Sender from;
- };
- typedef std::vector<Frame> Conversation;
-
- InProcessBroker(const framing::ProtocolVersion& ver) :
- Connector(ver),
- protocolInit(ver),
- broker(broker::Broker::create()),
- brokerOut(BROKER, conversation),
- brokerConnection(&brokerOut, *broker),
- clientOut(CLIENT, conversation, &brokerConnection)
- {}
-
- void connect(const std::string& /*host*/, int /*port*/) {}
- void init() { brokerConnection.initiated(&protocolInit); }
- void close() {}
-
- /** Client's input handler. */
- void setInputHandler(framing::InputHandler* handler) {
- brokerOut.in = handler;
+ qpid::broker::InProcessBroker broker;
+
+ /** Constructor creates broker and opens client connection. */
+ InProcessBrokerClient(qpid::framing::ProtocolVersion version)
+ : broker(version)
+ {
+ setConnector(broker);
+ open("");
}
- /** Called by client to send a frame */
- void send(framing::AMQFrame* frame) {
- clientOut.send(frame);
+ ~InProcessBrokerClient() {
+ close(); // close before broker is deleted.
}
-
- /** Entire client-broker conversation is recorded here */
- Conversation conversation;
-
- private:
- /** OutputHandler that forwards data to an InputHandler */
- struct OutputToInputHandler : public sys::ConnectionOutputHandler {
- OutputToInputHandler(
- Sender from_, Conversation& conversation_,
- framing::InputHandler* ih=0
- ) : from(from_), conversation(conversation_), in(ih) {}
-
- void send(framing::AMQFrame* frame) {
- conversation.push_back(Frame(from, copy(*frame)));
- in->received(frame);
- }
-
- void close() {}
-
- Sender from;
- Conversation& conversation;
- framing::InputHandler* in;
- };
-
- framing::ProtocolInitiation protocolInit;
- Broker::shared_ptr broker;
- OutputToInputHandler brokerOut;
- broker::Connection brokerConnection;
- OutputToInputHandler clientOut;
};
-std::ostream& operator<<(
- std::ostream& out, const InProcessBroker::Frame& frame)
-{
- return out << (frame.fromBroker()? "BROKER: ":"CLIENT: ") <<
- static_cast<const framing::AMQFrame&>(frame);
-}
-std::ostream& operator<<(
- std::ostream& out, const InProcessBroker::Conversation& conv)
-{
- for (InProcessBroker::Conversation::const_iterator i = conv.begin();
- i != conv.end(); ++i)
- {
- out << *i << std::endl;
- }
- return out;
-}
-
-
-}} // namespace qpid::broker
-
-#endif /*!_tests_InProcessBroker_h*/
+#endif // _tests_InProcessBroker_h
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am?view=diff&rev=507560&r1=507559&r2=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am Wed Feb 14 07:02:10 2007
@@ -47,14 +47,15 @@
HeaderTest
misc_tests = \
- ExceptionTest
+ ExceptionTest \
+ ProducerConsumerTest
posix_tests = \
EventChannelTest \
EventChannelThreadsTest
unit_tests = \
- $(broker_tests) \
+b $(broker_tests) \
$(framing_tests) \
$(misc_tests) \
$(round_trip_tests)
Added: incubator/qpid/branches/qpid.0-9/cpp/tests/ProducerConsumerTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ProducerConsumerTest.cpp?view=auto&rev=507560
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ProducerConsumerTest.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ProducerConsumerTest.cpp Wed Feb 14 07:02:10 2007
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <vector>
+#include <iostream>
+
+#include <boost/bind.hpp>
+
+#include <qpid_test_plugin.h>
+#include "InProcessBroker.h"
+#include "sys/ProducerConsumer.h"
+#include "sys/Thread.h"
+#include "AMQP_HighestVersion.h"
+#include "sys/AtomicCount.h"
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace boost;
+using namespace std;
+
+/** A counter that notifies a monitor when changed */
+class WatchedCounter : public Monitor {
+ public:
+ WatchedCounter(int i=0) : count(i) {}
+ WatchedCounter(const WatchedCounter& c) : Monitor(), count(int(c)) {}
+
+ WatchedCounter& operator=(const WatchedCounter& x) {
+ return *this = int(x);
+ }
+
+ WatchedCounter& operator=(int i) {
+ Lock l(*this);
+ count = i;
+ return *this;
+ }
+
+ int operator++() {
+ Lock l(*this);
+ notifyAll();
+ return ++count;
+ }
+
+ int operator++(int) {
+ Lock l(*this);
+ notifyAll();
+ return count++;
+ }
+
+ bool operator==(int i) const {
+ Lock l(const_cast<WatchedCounter&>(*this));
+ return i == count;
+ }
+
+ operator int() const {
+ Lock l(const_cast<WatchedCounter&>(*this));
+ return count;
+ }
+
+ bool waitFor(int i, Time timeout=TIME_SEC) {
+ Lock l(*this);
+ Time deadline = timeout+now();
+ while (count != i) {
+ if (!wait(deadline))
+ return false;
+ }
+ assert(count == i);
+ return true;
+ }
+
+ private:
+ typedef Mutex::ScopedLock Lock;
+ int count;
+};
+
+class ProducerConsumerTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(ProducerConsumerTest);
+ CPPUNIT_TEST(testProduceConsume);
+ CPPUNIT_TEST(testTimeout);
+ CPPUNIT_TEST(testStop);
+ CPPUNIT_TEST(testCancel);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+ InProcessBrokerClient client;
+ ProducerConsumer pc;
+
+ WatchedCounter stopped;
+ WatchedCounter timeout;
+ WatchedCounter consumed;
+ WatchedCounter produced;
+
+ struct ConsumeRunnable : public Runnable {
+ ProducerConsumerTest& test;
+ ConsumeRunnable(ProducerConsumerTest& test_) : test(test_) {}
+ void run() { test.consume(); }
+ };
+
+ struct ConsumeTimeoutRunnable : public Runnable {
+ ProducerConsumerTest& test;
+ Time timeout;
+ ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Time& t)
+ : test(test_), timeout(t) {}
+ void run() { test.consumeTimeout(timeout); }
+ };
+
+
+ void consumeInternal(ProducerConsumer::ConsumerLock& consumer) {
+ if (pc.isStopped()) {
+ ++stopped;
+ return;
+ }
+ if (consumer.isTimedOut()) {
+ ++timeout;
+ return;
+ }
+ CPPUNIT_ASSERT(consumer.isOk());
+ CPPUNIT_ASSERT(pc.available() > 0);
+ consumer.confirm();
+ consumed++;
+ }
+
+ void consume() {
+ ProducerConsumer::ConsumerLock consumer(pc);
+ consumeInternal(consumer);
+ };
+
+ void consumeTimeout(const Time& timeout) {
+ ProducerConsumer::ConsumerLock consumer(pc, timeout);
+ consumeInternal(consumer);
+ };
+
+ void produce() {
+ ProducerConsumer::ProducerLock producer(pc);
+ CPPUNIT_ASSERT(producer.isOk());
+ producer.confirm();
+ produced++;
+ }
+
+ void join(vector<Thread>& threads) {
+ for_each(threads.begin(), threads.end(), bind(&Thread::join,_1));
+ }
+
+ vector<Thread> startThreads(size_t n, Runnable& runnable) {
+ vector<Thread> threads(n);
+ while (n > 0)
+ threads[--n] = Thread(runnable);
+ return threads;
+ }
+
+public:
+ ProducerConsumerTest() : client(highestProtocolVersion) {}
+
+ void testProduceConsume() {
+ ConsumeRunnable runMe(*this);
+ produce();
+ produce();
+ CPPUNIT_ASSERT(produced.waitFor(2));
+ vector<Thread> threads = startThreads(1, runMe);
+ CPPUNIT_ASSERT(consumed.waitFor(1));
+ join(threads);
+
+ threads = startThreads(1, runMe);
+ CPPUNIT_ASSERT(consumed.waitFor(2));
+ join(threads);
+
+ threads = startThreads(3, runMe);
+ produce();
+ produce();
+ CPPUNIT_ASSERT(consumed.waitFor(4));
+ produce();
+ CPPUNIT_ASSERT(consumed.waitFor(5));
+ join(threads);
+ CPPUNIT_ASSERT_EQUAL(0, int(stopped));
+ }
+
+ void testTimeout() {
+ try {
+ // 0 timeout no items available throws exception
+ ProducerConsumer::ConsumerLock consumer(pc, 0);
+ CPPUNIT_FAIL("Expected exception");
+ } catch(...){}
+
+ produce();
+ CPPUNIT_ASSERT(produced.waitFor(1));
+ CPPUNIT_ASSERT_EQUAL(1, int(pc.available()));
+ {
+ // 0 timeout succeeds if there's an item available.
+ ProducerConsumer::ConsumerLock consume(pc, 0);
+ CPPUNIT_ASSERT(consume.isOk());
+ consume.confirm();
+ }
+ CPPUNIT_ASSERT_EQUAL(0, int(pc.available()));
+
+ // Produce an item within the timeout.
+ ConsumeTimeoutRunnable runMe(*this, 2*TIME_SEC);
+ vector<Thread> threads = startThreads(1, runMe);
+ produce();
+ CPPUNIT_ASSERT(consumed.waitFor(1));
+ join(threads);
+ }
+
+
+ void testStop() {
+ ConsumeRunnable runMe(*this);
+ vector<Thread> threads = startThreads(2, runMe);
+ while (pc.consumers() != 2)
+ Thread::yield();
+ pc.stop();
+ CPPUNIT_ASSERT(stopped.waitFor(2));
+ join(threads);
+
+ threads = startThreads(1, runMe); // Should stop immediately.
+ CPPUNIT_ASSERT(stopped.waitFor(3));
+ join(threads);
+
+ // Produce/consume while stopped should return isStopped and
+ // throw on confirm.
+ try {
+ ProducerConsumer::ProducerLock p(pc);
+ CPPUNIT_ASSERT(pc.isStopped());
+ CPPUNIT_FAIL("Expected exception");
+ }
+ catch (...) {} // Expected
+ try {
+ ProducerConsumer::ConsumerLock c(pc);
+ CPPUNIT_ASSERT(pc.isStopped());
+ CPPUNIT_FAIL("Expected exception");
+ }
+ catch (...) {} // Expected
+ }
+
+ void testCancel() {
+ CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
+ {
+ ProducerConsumer::ProducerLock p(pc);
+ CPPUNIT_ASSERT(p.isOk());
+ p.cancel();
+ }
+ // Nothing was produced.
+ CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
+ {
+ ProducerConsumer::ConsumerLock c(pc, 0);
+ CPPUNIT_ASSERT(c.isTimedOut());
+ }
+ // Now produce but cancel the consume
+ {
+ ProducerConsumer::ProducerLock p(pc);
+ CPPUNIT_ASSERT(p.isOk());
+ p.confirm();
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
+ {
+ ProducerConsumer::ConsumerLock c(pc);
+ CPPUNIT_ASSERT(c.isOk());
+ c.cancel();
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
+ }
+};
+
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ProducerConsumerTest);
+
Propchange: incubator/qpid/branches/qpid.0-9/cpp/tests/ProducerConsumerTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/tests/ProducerConsumerTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date