You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/05/14 15:55:18 UTC

svn commit: r944261 - in /qpid/trunk/qpid/cpp: include/qpid/messaging/ src/qpid/client/amqp0_10/ src/qpid/messaging/ src/tests/

Author: aconway
Date: Fri May 14 13:55:18 2010
New Revision: 944261

URL: http://svn.apache.org/viewvc?rev=944261&view=rev
Log:
Initial multi-thread unit test for messaging API.

-  added Receiver::isClosed() to test for local close.

Added:
    qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp
Modified:
    qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h?rev=944261&r1=944260&r2=944261&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h Fri May 14 13:55:18 2010
@@ -49,8 +49,7 @@ class Receiver : public qpid::messaging:
     /**
      * Retrieves a message from this receivers local queue, or waits
      * for upto the specified timeout for a message to become
-     * available. Returns false if there is no message to give after
-     * waiting for the specified timeout.
+     * available.
      */
     QPID_CLIENT_EXTERN bool get(Message& message, Duration timeout=Duration::FOREVER);
     /**
@@ -59,7 +58,8 @@ class Receiver : public qpid::messaging:
      * available.
      *
      * @exception NoMessageAvailable if there is no message to give
-     * after waiting for the specified timeout.
+     * after waiting for the specified timeout, or if the Receiver is
+     * closed, in which case isClose() will be true.
      */
     QPID_CLIENT_EXTERN Message get(Duration timeout=Duration::FOREVER);
     /**
@@ -68,6 +68,10 @@ class Receiver : public qpid::messaging:
      * available. Unlike get() this method will check with the server
      * that there is no message for the subscription this receiver is
      * serving before returning false.
+     *
+     * @return false if there is no message to give after
+     * waiting for the specified timeout, or if the Receiver is
+     * closed, in which case isClose() will be true.
      */
     QPID_CLIENT_EXTERN bool fetch(Message& message, Duration timeout=Duration::FOREVER);
     /**
@@ -78,7 +82,8 @@ class Receiver : public qpid::messaging:
      * serving before throwing an exception.
      *
      * @exception NoMessageAvailable if there is no message to give
-     * after waiting for the specified timeout.
+     * after waiting for the specified timeout, or if the Receiver is
+     * closed, in which case isClose() will be true.
      */
     QPID_CLIENT_EXTERN Message fetch(Duration timeout=Duration::FOREVER);
     /**
@@ -88,19 +93,19 @@ class Receiver : public qpid::messaging:
      */
     QPID_CLIENT_EXTERN void setCapacity(uint32_t);
     /**
-     * Returns the capacity of the receiver. The capacity determines
+     * @return the capacity of the receiver. The capacity determines
      * how many incoming messages can be held in the receiver before
      * being requested by a client via fetch() (or pushed to a
      * listener).
      */
     QPID_CLIENT_EXTERN uint32_t getCapacity();
     /**
-     * Returns the number of messages received and waiting to be
+     * @return the number of messages received and waiting to be
      * fetched.
      */
     QPID_CLIENT_EXTERN uint32_t getAvailable();
     /**
-     * Returns a count of the number of messages received on this
+     * @return a count of the number of messages received on this
      * receiver that have been acknowledged, but for which that
      * acknowledgement has not yet been confirmed as processed by the
      * server.
@@ -113,6 +118,11 @@ class Receiver : public qpid::messaging:
     QPID_CLIENT_EXTERN void close();
 
     /**
+     * Return true if the receiver was closed by a call to close()
+     */
+    QPID_CLIENT_EXTERN bool isClosed() const;
+
+    /**
      * Returns the name of this receiver.
      */
     QPID_CLIENT_EXTERN const std::string& getName() const;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=944261&r1=944260&r2=944261&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Fri May 14 13:55:18 2010
@@ -193,6 +193,13 @@ void ReceiverImpl::closeImpl() 
     }
 }
 
+bool ReceiverImpl::isClosed() const {
+    sys::Mutex::ScopedLock l(lock);
+    return state == CANCELLED;
+}
+
+
+
 void ReceiverImpl::setCapacityImpl(uint32_t c)
 {
     sys::Mutex::ScopedLock l(lock);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=944261&r1=944260&r2=944261&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Fri May 14 13:55:18 2010
@@ -65,6 +65,8 @@ class ReceiverImpl : public qpid::messag
     uint32_t getUnsettled();
     void received(qpid::messaging::Message& message);
     qpid::messaging::Session getSession() const;
+    bool isClosed() const;
+
   private:
     mutable sys::Mutex lock;
     boost::intrusive_ptr<SessionImpl> parent;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp?rev=944261&r1=944260&r2=944261&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp Fri May 14 13:55:18 2010
@@ -44,4 +44,5 @@ uint32_t Receiver::getUnsettled() { retu
 void Receiver::close() { impl->close(); }
 const std::string& Receiver::getName() const { return impl->getName(); }
 Session Receiver::getSession() const { return impl->getSession(); }
+bool Receiver::isClosed() const { return impl->isClosed(); }
 }} // namespace qpid::messaging

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h?rev=944261&r1=944260&r2=944261&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h Fri May 14 13:55:18 2010
@@ -45,6 +45,7 @@ class ReceiverImpl : public virtual qpid
     virtual void close() = 0;
     virtual const std::string& getName() const = 0;
     virtual Session getSession() const = 0;
+    virtual bool isClosed() const = 0;
 };
 }} // namespace qpid::messaging
 

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=944261&r1=944260&r2=944261&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri May 14 13:55:18 2010
@@ -68,6 +68,8 @@ unit_test_LDADD=-lboost_unit_test_framew
 
 unit_test_SOURCES= unit_test.cpp unit_test.h \
 	MessagingSessionTests.cpp \
+	MessagingThreadTests.cpp \
+	MessagingFixture.h \
 	ClientSessionTest.cpp \
 	BrokerFixture.h SocketProxy.h \
 	exception_test.cpp \

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h?rev=944261&r1=944260&r2=944261&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h Fri May 14 13:55:18 2010
@@ -182,7 +182,7 @@ struct MultiQueueFixture : MessagingFixt
 
 };
 
-std::vector<std::string> fetch(messaging::Receiver& receiver, int count, messaging::Duration timeout=messaging::Duration::SECOND*5)
+inline std::vector<std::string> fetch(messaging::Receiver& receiver, int count, messaging::Duration timeout=messaging::Duration::SECOND*5)
 {
     std::vector<std::string> data;
     messaging::Message message;
@@ -193,7 +193,7 @@ std::vector<std::string> fetch(messaging
 }
 
 
-void send(messaging::Sender& sender, uint count = 1, uint start = 1,
+inline void send(messaging::Sender& sender, uint count = 1, uint start = 1,
           const std::string& base = "Message")
 {
     for (uint i = start; i < start + count; ++i) {
@@ -201,7 +201,7 @@ void send(messaging::Sender& sender, uin
     }
 }
 
-void receive(messaging::Receiver& receiver, uint count = 1, uint start = 1,
+inline void receive(messaging::Receiver& receiver, uint count = 1, uint start = 1,
              const std::string& base = "Message",
              messaging::Duration timeout=messaging::Duration::SECOND*5)
 {

Added: qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp?rev=944261&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingThreadTests.cpp Fri May 14 13:55:18 2010
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessagingFixture.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace tests {
+QPID_AUTO_TEST_SUITE(MessagingThreadTests)
+
+using namespace messaging;
+using namespace boost::assign;
+using namespace std;
+
+struct ReceiveThread : public sys::Runnable {
+    Receiver receiver;
+    vector<string> received;
+    string error;
+
+    ReceiveThread(Receiver s) : receiver(s) {}
+    void run() {
+        try {
+            while(true) {
+                Message m = receiver.fetch(Duration::SECOND*5);
+                if (m.getContent() == "END") break;
+                received.push_back(m.getContent());
+            }
+        } catch (const NoMessageAvailable& e) {
+            // Indicates that fetch timed out OR receiver was closed by other thread.
+            if (!receiver.isClosed()) // timeout
+                error = e.what();
+        } catch (const std::exception& e) {
+            error = e.what();
+        }
+    }
+};
+
+QPID_AUTO_TEST_CASE(testConcurrentSendReceive) {
+    QueueFixture fix;
+    Sender s = fix.session.createSender(fix.queue);
+    Receiver r = fix.session.createReceiver(fix.queue+";{link:{reliability:unreliable}}");
+    ReceiveThread rt(r);
+    sys::Thread thread(rt);
+    const size_t COUNT=1000;
+    for (size_t i = 0; i < COUNT; ++i) {
+        s.send(Message());
+    }
+    s.send(Message("END"));
+    thread.join();
+    BOOST_CHECK_EQUAL(rt.error, string());
+    BOOST_CHECK_EQUAL(COUNT, rt.received.size());
+}
+
+QPID_AUTO_TEST_CASE(testCloseBusyReceiver) {
+    QueueFixture fix;
+    Receiver r = fix.session.createReceiver(fix.queue);
+    ReceiveThread rt(r);
+    sys::Thread thread(rt);
+    r.close();
+    thread.join();
+    BOOST_CHECK_EQUAL(rt.error, string());
+
+    // Check that using a closed receiver gives the right result.
+    Message m;
+    BOOST_CHECK(!r.fetch(m, Duration(0)));
+    BOOST_CHECK_THROW(r.fetch(Duration(0)), NoMessageAvailable);
+}
+
+QPID_AUTO_TEST_CASE(testCloseSessionBusyReceiver) {
+    QueueFixture fix;
+    Receiver r = fix.session.createReceiver(fix.queue);
+    ReceiveThread rt(r);
+    sys::Thread thread(rt);
+    fix.session.close();
+    thread.join();
+    BOOST_CHECK_EQUAL(rt.error, string());
+}
+
+QPID_AUTO_TEST_SUITE_END()
+}} // namespace qpid::tests



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org