You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/05/14 15:55:18 UTC
svn commit: r944261 - in /qpid/trunk/qpid/cpp: include/qpid/messaging/
src/qpid/client/amqp0_10/ src/qpid/messaging/ src/tests/
Author: aconway
Date: Fri May 14 13:55:18 2010
New Revision: 944261
URL: http://svn.apache.org/viewvc?rev=944261&view=rev
Log:
Initial multi-thread unit test for messaging API.
- added Receiver::isClosed() to test for local close.
Added:
qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp
Modified:
qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
qpid/trunk/qpid/cpp/src/tests/Makefile.am
qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h?rev=944261&r1=944260&r2=944261&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h Fri May 14 13:55:18 2010
@@ -49,8 +49,7 @@ class Receiver : public qpid::messaging:
/**
* Retrieves a message from this receivers local queue, or waits
* for upto the specified timeout for a message to become
- * available. Returns false if there is no message to give after
- * waiting for the specified timeout.
+ * available.
*/
QPID_CLIENT_EXTERN bool get(Message& message, Duration timeout=Duration::FOREVER);
/**
@@ -59,7 +58,8 @@ class Receiver : public qpid::messaging:
* available.
*
* @exception NoMessageAvailable if there is no message to give
- * after waiting for the specified timeout.
+ * after waiting for the specified timeout, or if the Receiver is
+ * closed, in which case isClose() will be true.
*/
QPID_CLIENT_EXTERN Message get(Duration timeout=Duration::FOREVER);
/**
@@ -68,6 +68,10 @@ class Receiver : public qpid::messaging:
* available. Unlike get() this method will check with the server
* that there is no message for the subscription this receiver is
* serving before returning false.
+ *
+ * @return false if there is no message to give after
+ * waiting for the specified timeout, or if the Receiver is
+ * closed, in which case isClose() will be true.
*/
QPID_CLIENT_EXTERN bool fetch(Message& message, Duration timeout=Duration::FOREVER);
/**
@@ -78,7 +82,8 @@ class Receiver : public qpid::messaging:
* serving before throwing an exception.
*
* @exception NoMessageAvailable if there is no message to give
- * after waiting for the specified timeout.
+ * after waiting for the specified timeout, or if the Receiver is
+ * closed, in which case isClose() will be true.
*/
QPID_CLIENT_EXTERN Message fetch(Duration timeout=Duration::FOREVER);
/**
@@ -88,19 +93,19 @@ class Receiver : public qpid::messaging:
*/
QPID_CLIENT_EXTERN void setCapacity(uint32_t);
/**
- * Returns the capacity of the receiver. The capacity determines
+ * @return the capacity of the receiver. The capacity determines
* how many incoming messages can be held in the receiver before
* being requested by a client via fetch() (or pushed to a
* listener).
*/
QPID_CLIENT_EXTERN uint32_t getCapacity();
/**
- * Returns the number of messages received and waiting to be
+ * @return the number of messages received and waiting to be
* fetched.
*/
QPID_CLIENT_EXTERN uint32_t getAvailable();
/**
- * Returns a count of the number of messages received on this
+ * @return a count of the number of messages received on this
* receiver that have been acknowledged, but for which that
* acknowledgement has not yet been confirmed as processed by the
* server.
@@ -113,6 +118,11 @@ class Receiver : public qpid::messaging:
QPID_CLIENT_EXTERN void close();
/**
+ * Return true if the receiver was closed by a call to close()
+ */
+ QPID_CLIENT_EXTERN bool isClosed() const;
+
+ /**
* Returns the name of this receiver.
*/
QPID_CLIENT_EXTERN const std::string& getName() const;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=944261&r1=944260&r2=944261&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Fri May 14 13:55:18 2010
@@ -193,6 +193,13 @@ void ReceiverImpl::closeImpl()
}
}
+bool ReceiverImpl::isClosed() const {
+ sys::Mutex::ScopedLock l(lock);
+ return state == CANCELLED;
+}
+
+
+
void ReceiverImpl::setCapacityImpl(uint32_t c)
{
sys::Mutex::ScopedLock l(lock);
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=944261&r1=944260&r2=944261&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Fri May 14 13:55:18 2010
@@ -65,6 +65,8 @@ class ReceiverImpl : public qpid::messag
uint32_t getUnsettled();
void received(qpid::messaging::Message& message);
qpid::messaging::Session getSession() const;
+ bool isClosed() const;
+
private:
mutable sys::Mutex lock;
boost::intrusive_ptr<SessionImpl> parent;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp?rev=944261&r1=944260&r2=944261&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp Fri May 14 13:55:18 2010
@@ -44,4 +44,5 @@ uint32_t Receiver::getUnsettled() { retu
void Receiver::close() { impl->close(); }
const std::string& Receiver::getName() const { return impl->getName(); }
Session Receiver::getSession() const { return impl->getSession(); }
+bool Receiver::isClosed() const { return impl->isClosed(); }
}} // namespace qpid::messaging
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h?rev=944261&r1=944260&r2=944261&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h Fri May 14 13:55:18 2010
@@ -45,6 +45,7 @@ class ReceiverImpl : public virtual qpid
virtual void close() = 0;
virtual const std::string& getName() const = 0;
virtual Session getSession() const = 0;
+ virtual bool isClosed() const = 0;
};
}} // namespace qpid::messaging
Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=944261&r1=944260&r2=944261&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri May 14 13:55:18 2010
@@ -68,6 +68,8 @@ unit_test_LDADD=-lboost_unit_test_framew
unit_test_SOURCES= unit_test.cpp unit_test.h \
MessagingSessionTests.cpp \
+ MessagingThreadTests.cpp \
+ MessagingFixture.h \
ClientSessionTest.cpp \
BrokerFixture.h SocketProxy.h \
exception_test.cpp \
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h?rev=944261&r1=944260&r2=944261&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h Fri May 14 13:55:18 2010
@@ -182,7 +182,7 @@ struct MultiQueueFixture : MessagingFixt
};
-std::vector<std::string> fetch(messaging::Receiver& receiver, int count, messaging::Duration timeout=messaging::Duration::SECOND*5)
+inline std::vector<std::string> fetch(messaging::Receiver& receiver, int count, messaging::Duration timeout=messaging::Duration::SECOND*5)
{
std::vector<std::string> data;
messaging::Message message;
@@ -193,7 +193,7 @@ std::vector<std::string> fetch(messaging
}
-void send(messaging::Sender& sender, uint count = 1, uint start = 1,
+inline void send(messaging::Sender& sender, uint count = 1, uint start = 1,
const std::string& base = "Message")
{
for (uint i = start; i < start + count; ++i) {
@@ -201,7 +201,7 @@ void send(messaging::Sender& sender, uin
}
}
-void receive(messaging::Receiver& receiver, uint count = 1, uint start = 1,
+inline void receive(messaging::Receiver& receiver, uint count = 1, uint start = 1,
const std::string& base = "Message",
messaging::Duration timeout=messaging::Duration::SECOND*5)
{
Added: qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp?rev=944261&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp Fri May 14 13:55:18 2010
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessagingFixture.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace tests {
+QPID_AUTO_TEST_SUITE(MessagingThreadTests)
+
+using namespace messaging;
+using namespace boost::assign;
+using namespace std;
+
+struct ReceiveThread : public sys::Runnable {
+ Receiver receiver;
+ vector<string> received;
+ string error;
+
+ ReceiveThread(Receiver s) : receiver(s) {}
+ void run() {
+ try {
+ while(true) {
+ Message m = receiver.fetch(Duration::SECOND*5);
+ if (m.getContent() == "END") break;
+ received.push_back(m.getContent());
+ }
+ } catch (const NoMessageAvailable& e) {
+ // Indicates that fetch timed out OR receiver was closed by other thread.
+ if (!receiver.isClosed()) // timeout
+ error = e.what();
+ } catch (const std::exception& e) {
+ error = e.what();
+ }
+ }
+};
+
+QPID_AUTO_TEST_CASE(testConcurrentSendReceive) {
+ QueueFixture fix;
+ Sender s = fix.session.createSender(fix.queue);
+ Receiver r = fix.session.createReceiver(fix.queue+";{link:{reliability:unreliable}}");
+ ReceiveThread rt(r);
+ sys::Thread thread(rt);
+ const size_t COUNT=1000;
+ for (size_t i = 0; i < COUNT; ++i) {
+ s.send(Message());
+ }
+ s.send(Message("END"));
+ thread.join();
+ BOOST_CHECK_EQUAL(rt.error, string());
+ BOOST_CHECK_EQUAL(COUNT, rt.received.size());
+}
+
+QPID_AUTO_TEST_CASE(testCloseBusyReceiver) {
+ QueueFixture fix;
+ Receiver r = fix.session.createReceiver(fix.queue);
+ ReceiveThread rt(r);
+ sys::Thread thread(rt);
+ r.close();
+ thread.join();
+ BOOST_CHECK_EQUAL(rt.error, string());
+
+ // Check that using a closed receiver gives the right result.
+ Message m;
+ BOOST_CHECK(!r.fetch(m, Duration(0)));
+ BOOST_CHECK_THROW(r.fetch(Duration(0)), NoMessageAvailable);
+}
+
+QPID_AUTO_TEST_CASE(testCloseSessionBusyReceiver) {
+ QueueFixture fix;
+ Receiver r = fix.session.createReceiver(fix.queue);
+ ReceiveThread rt(r);
+ sys::Thread thread(rt);
+ fix.session.close();
+ thread.join();
+ BOOST_CHECK_EQUAL(rt.error, string());
+}
+
+QPID_AUTO_TEST_SUITE_END()
+}} // namespace qpid::tests
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org