You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC
svn commit: r1187375 [15/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Thread.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Thread.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Thread.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Thread.cpp Fri Oct 21 14:42:12 2011
@@ -19,6 +19,11 @@
*
*/
+// Ensure definition of OpenThread in mingw
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/windows/check.h"
@@ -26,50 +31,204 @@
#include <process.h>
#include <windows.h>
-namespace {
-unsigned __stdcall runRunnable(void* p)
-{
- static_cast<qpid::sys::Runnable*>(p)->run();
- _endthreadex(0);
- return 0;
-}
-}
+/*
+ * This implementation distinguishes between two types of thread: Qpid
+ * threads (based on qpid::sys::Runnable) and the rest. It provides a
+ * join() that will not deadlock against the Windows loader lock for
+ * Qpid threads.
+ *
+ * System thread identifiers are unique per Windows thread; thread
+ * handles are not. Thread identifiers can be recycled, but keeping a
+ * handle open against the thread prevents recycling as long as
+ * shared_ptr references to a ThreadPrivate structure remain.
+ *
+ * There is a 1-1 relationship between Qpid threads and their
+ * ThreadPrivate structure. Non-Qpid threads do not need to find the
+ * qpidThreadDone handle, so there may be a 1-many relationship for
+ * them.
+ *
+ * TLS storage is used for a lockless solution for static library
+ * builds. The special case of LoadLibrary/FreeLibrary requires
+ * additional synchronization variables and resource cleanup in
+ * DllMain. _DLL marks the dynamic case.
+ */
namespace qpid {
namespace sys {
class ThreadPrivate {
+public:
friend class Thread;
+ friend unsigned __stdcall runThreadPrivate(void*);
+ typedef boost::shared_ptr<ThreadPrivate> shared_ptr;
+ ~ThreadPrivate();
- HANDLE threadHandle;
+private:
unsigned threadId;
-
- ThreadPrivate(Runnable* runnable) {
- uintptr_t h = _beginthreadex(0,
- 0,
- runRunnable,
- runnable,
- 0,
- &threadId);
- QPID_WINDOWS_CHECK_CRT_NZ(h);
- threadHandle = reinterpret_cast<HANDLE>(h);
+ HANDLE threadHandle;
+ HANDLE initCompleted;
+ HANDLE qpidThreadDone;
+ Runnable* runnable;
+ shared_ptr keepAlive;
+
+ ThreadPrivate() : threadId(GetCurrentThreadId()), initCompleted(NULL),
+ qpidThreadDone(NULL), runnable(NULL) {
+ threadHandle = OpenThread (SYNCHRONIZE, FALSE, threadId);
+ QPID_WINDOWS_CHECK_CRT_NZ(threadHandle);
}
-
- ThreadPrivate()
- : threadHandle(GetCurrentThread()), threadId(GetCurrentThreadId()) {}
+
+ ThreadPrivate(Runnable* r) : threadHandle(NULL), initCompleted(NULL),
+ qpidThreadDone(NULL), runnable(r) {}
+
+ void start(shared_ptr& p);
+ static shared_ptr createThread(Runnable* r);
};
+}} // namespace qpid::sys
+
+
+namespace {
+using namespace qpid::sys;
+
+#ifdef _DLL
+class ScopedCriticalSection
+{
+ public:
+ ScopedCriticalSection(CRITICAL_SECTION& cs) : criticalSection(cs) { EnterCriticalSection(&criticalSection); }
+ ~ScopedCriticalSection() { LeaveCriticalSection(&criticalSection); }
+ private:
+ CRITICAL_SECTION& criticalSection;
+};
+
+CRITICAL_SECTION threadLock;
+long runningThreads = 0;
+HANDLE threadsDone;
+bool terminating = false;
+#endif
+
+
+DWORD volatile tlsIndex = TLS_OUT_OF_INDEXES;
+
+DWORD getTlsIndex() {
+ if (tlsIndex != TLS_OUT_OF_INDEXES)
+ return tlsIndex; // already set
+
+ DWORD trialIndex = TlsAlloc();
+ QPID_WINDOWS_CHECK_NOT(trialIndex, TLS_OUT_OF_INDEXES); // No OS resource
+
+ // only one thread gets to set the value
+ DWORD actualIndex = (DWORD) InterlockedCompareExchange((LONG volatile *) &tlsIndex, (LONG) trialIndex, (LONG) TLS_OUT_OF_INDEXES);
+ if (actualIndex == TLS_OUT_OF_INDEXES)
+ return trialIndex; // we won the race
+ else {
+ TlsFree(trialIndex);
+ return actualIndex;
+ }
+}
+
+} // namespace
+
+namespace qpid {
+namespace sys {
+
+unsigned __stdcall runThreadPrivate(void* p)
+{
+ ThreadPrivate* threadPrivate = static_cast<ThreadPrivate*>(p);
+ TlsSetValue(getTlsIndex(), threadPrivate);
+
+ WaitForSingleObject (threadPrivate->initCompleted, INFINITE);
+ CloseHandle (threadPrivate->initCompleted);
+ threadPrivate->initCompleted = NULL;
+
+ try {
+ threadPrivate->runnable->run();
+ } catch (...) {
+ // not our concern
+ }
+
+ SetEvent (threadPrivate->qpidThreadDone); // allow join()
+ threadPrivate->keepAlive.reset(); // may run ThreadPrivate destructor
+
+#ifdef _DLL
+ {
+ ScopedCriticalSection l(threadLock);
+ if (--runningThreads == 0)
+ SetEvent(threadsDone);
+ }
+#endif
+ return 0;
+}
+
+
+ThreadPrivate::shared_ptr ThreadPrivate::createThread(Runnable* runnable) {
+ ThreadPrivate::shared_ptr tp(new ThreadPrivate(runnable));
+ tp->start(tp);
+ return tp;
+}
+
+void ThreadPrivate::start(ThreadPrivate::shared_ptr& tp) {
+ getTlsIndex(); // fail here if OS problem, not in new thread
+
+ initCompleted = CreateEvent (NULL, TRUE, FALSE, NULL);
+ QPID_WINDOWS_CHECK_CRT_NZ(initCompleted);
+ qpidThreadDone = CreateEvent (NULL, TRUE, FALSE, NULL);
+ QPID_WINDOWS_CHECK_CRT_NZ(qpidThreadDone);
+
+#ifdef _DLL
+ {
+ ScopedCriticalSection l(threadLock);
+ if (terminating)
+ throw qpid::Exception(QPID_MSG("creating thread after exit/FreeLibrary"));
+ runningThreads++;
+ }
+#endif
+
+ uintptr_t h = _beginthreadex(0,
+ 0,
+ runThreadPrivate,
+ (void *)this,
+ 0,
+ &threadId);
+
+#ifdef _DLL
+ if (h == NULL) {
+ ScopedCriticalSection l(threadLock);
+ if (--runningThreads == 0)
+ SetEvent(threadsDone);
+ }
+#endif
+
+ QPID_WINDOWS_CHECK_CRT_NZ(h);
+
+ // Success
+ keepAlive = tp;
+ threadHandle = reinterpret_cast<HANDLE>(h);
+ SetEvent (initCompleted);
+}
+
+ThreadPrivate::~ThreadPrivate() {
+ if (threadHandle)
+ CloseHandle (threadHandle);
+ if (initCompleted)
+ CloseHandle (initCompleted);
+ if (qpidThreadDone)
+ CloseHandle (qpidThreadDone);
+}
+
+
Thread::Thread() {}
-Thread::Thread(Runnable* runnable) : impl(new ThreadPrivate(runnable)) {}
+Thread::Thread(Runnable* runnable) : impl(ThreadPrivate::createThread(runnable)) {}
-Thread::Thread(Runnable& runnable) : impl(new ThreadPrivate(&runnable)) {}
+Thread::Thread(Runnable& runnable) : impl(ThreadPrivate::createThread(&runnable)) {}
Thread::operator bool() {
return impl;
}
bool Thread::operator==(const Thread& t) const {
+ if (!impl || !t.impl)
+ return false;
return impl->threadId == t.impl->threadId;
}
@@ -79,10 +238,17 @@ bool Thread::operator!=(const Thread& t)
void Thread::join() {
if (impl) {
- DWORD status = WaitForSingleObject (impl->threadHandle, INFINITE);
+ DWORD status;
+ if (impl->runnable) {
+ HANDLE handles[2] = {impl->qpidThreadDone, impl->threadHandle};
+ // wait for either. threadHandle not signalled if loader
+ // lock held (FreeLibrary). qpidThreadDone not signalled
+ // if thread terminated by exit().
+ status = WaitForMultipleObjects (2, handles, false, INFINITE);
+ }
+ else
+ status = WaitForSingleObject (impl->threadHandle, INFINITE);
QPID_WINDOWS_CHECK_NOT(status, WAIT_FAILED);
- CloseHandle (impl->threadHandle);
- impl->threadHandle = 0;
}
}
@@ -92,9 +258,70 @@ unsigned long Thread::logId() {
/* static */
Thread Thread::current() {
+ ThreadPrivate* tlsValue = (ThreadPrivate *) TlsGetValue(getTlsIndex());
Thread t;
- t.impl.reset(new ThreadPrivate());
+ if (tlsValue != NULL) {
+ // called from within Runnable->run(), so keepAlive has positive use count
+ t.impl = tlsValue->keepAlive;
+ }
+ else
+ t.impl.reset(new ThreadPrivate());
return t;
}
-}} /* qpid::sys */
+}} // namespace qpid::sys
+
+
+#ifdef _DLL
+
+// DllMain: called possibly many times in a process lifetime if dll
+// loaded and freed repeatedly . Be mindful of Windows loader lock
+// and other DllMain restrictions.
+
+BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) {
+ switch (reason) {
+ case DLL_PROCESS_ATTACH:
+ InitializeCriticalSection(&threadLock);
+ threadsDone = CreateEvent(NULL, TRUE, FALSE, NULL);
+ break;
+
+ case DLL_PROCESS_DETACH:
+ terminating = true;
+ if (reserved != NULL) {
+ // process exit(): threads are stopped arbitrarily and
+ // possibly in an inconsistent state. Not even threadLock
+ // can be trusted. All static destructors have been
+ // called at this point and any resources this unit knows
+ // about will be released as part of process tear down by
+ // the OS. Accordingly, do nothing.
+ return TRUE;
+ }
+ else {
+ // FreeLibrary(): threads are still running and we are
+ // encouraged to clean up to avoid leaks. Mostly we just
+ // want any straggler threads to finish and notify
+ // threadsDone as the last thing they do.
+ while (1) {
+ {
+ ScopedCriticalSection l(threadLock);
+ if (runningThreads == 0)
+ break;
+ ResetEvent(threadsDone);
+ }
+ WaitForSingleObject(threadsDone, INFINITE);
+ }
+ if (tlsIndex != TLS_OUT_OF_INDEXES)
+ TlsFree(getTlsIndex());
+ CloseHandle(threadsDone);
+ DeleteCriticalSection(&threadLock);
+ }
+ break;
+
+ case DLL_THREAD_ATTACH:
+ case DLL_THREAD_DETACH:
+ break;
+ }
+ return TRUE;
+}
+
+#endif
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Time.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Time.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Time.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Time.cpp Fri Oct 21 14:42:12 2011
@@ -27,6 +27,17 @@
using namespace boost::posix_time;
+namespace {
+
+// High-res timing support. This will display times since program start,
+// more or less. Keep track of the start value and the conversion factor to
+// seconds.
+bool timeInitialized = false;
+LARGE_INTEGER start;
+double freq = 1.0;
+
+}
+
namespace qpid {
namespace sys {
@@ -91,10 +102,35 @@ void outputFormattedNow(std::ostream& o)
char time_string[100];
::time( &rawtime );
+#ifdef _MSC_VER
::localtime_s(&timeinfo, &rawtime);
+#else
+ timeinfo = *(::localtime(&rawtime));
+#endif
::strftime(time_string, 100,
"%Y-%m-%d %H:%M:%S",
&timeinfo);
o << time_string << " ";
}
+
+void outputHiresNow(std::ostream& o) {
+ if (!timeInitialized) {
+ start.QuadPart = 0;
+ LARGE_INTEGER iFreq;
+ iFreq.QuadPart = 1;
+ QueryPerformanceCounter(&start);
+ QueryPerformanceFrequency(&iFreq);
+ freq = static_cast<double>(iFreq.QuadPart);
+ timeInitialized = true;
+ }
+ LARGE_INTEGER iNow;
+ iNow.QuadPart = 0;
+ QueryPerformanceCounter(&iNow);
+ iNow.QuadPart -= start.QuadPart;
+ if (iNow.QuadPart < 0)
+ iNow.QuadPart = 0;
+ double now = static_cast<double>(iNow.QuadPart);
+ now /= freq; // now is seconds after this
+ o << std::fixed << std::setprecision(8) << std::setw(16) << std::setfill('0') << now << "s ";
+}
}}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/uuid.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/uuid.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/uuid.cpp Fri Oct 21 14:42:12 2011
@@ -19,7 +19,7 @@
*
*/
-#include <Rpc.h>
+#include <rpc.h>
#ifdef uuid_t /* Done in rpcdce.h */
# undef uuid_t
#endif
@@ -52,7 +52,11 @@ int uuid_parse (const char *in, uuid_t u
void uuid_unparse (const uuid_t uu, char *out) {
unsigned char *formatted;
if (UuidToString((UUID*)uu, &formatted) == RPC_S_OK) {
+#ifdef _MSC_VER
strncpy_s (out, 36+1, (char*)formatted, _TRUNCATE);
+#else
+ strncpy (out, (char*)formatted, 36+1);
+#endif
RpcStringFree(&formatted);
}
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/types/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/types/Uuid.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/types/Uuid.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/types/Uuid.cpp Fri Oct 21 14:42:12 2011
@@ -20,6 +20,7 @@
*/
#include "qpid/types/Uuid.h"
#include "qpid/sys/uuid.h"
+#include "qpid/sys/IntegerTypes.h"
#include <sstream>
#include <iostream>
#include <string.h>
@@ -71,7 +72,8 @@ void Uuid::clear()
// Force int 0/!0 to false/true; avoids compile warnings.
bool Uuid::isNull() const
{
- return !!uuid_is_null(bytes);
+ // This const cast is for Solaris which has non const arguments
+ return !!uuid_is_null(const_cast<uint8_t*>(bytes));
}
Uuid::operator bool() const { return !isNull(); }
@@ -86,7 +88,8 @@ const unsigned char* Uuid::data() const
bool operator==(const Uuid& a, const Uuid& b)
{
- return uuid_compare(a.bytes, b.bytes) == 0;
+ // This const cast is for Solaris which has non const arguments
+ return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) == 0;
}
bool operator!=(const Uuid& a, const Uuid& b)
@@ -96,22 +99,26 @@ bool operator!=(const Uuid& a, const Uui
bool operator<(const Uuid& a, const Uuid& b)
{
- return uuid_compare(a.bytes, b.bytes) < 0;
+ // This const cast is for Solaris which has non const arguments
+ return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) < 0;
}
bool operator>(const Uuid& a, const Uuid& b)
{
- return uuid_compare(a.bytes, b.bytes) > 0;
+ // This const cast is for Solaris which has non const arguments
+ return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) > 0;
}
bool operator<=(const Uuid& a, const Uuid& b)
{
- return uuid_compare(a.bytes, b.bytes) <= 0;
+ // This const cast is for Solaris which has non const arguments
+ return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) <= 0;
}
bool operator>=(const Uuid& a, const Uuid& b)
{
- return uuid_compare(a.bytes, b.bytes) >= 0;
+ // This const cast is for Solaris which has non const arguments
+ return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) >= 0;
}
ostream& operator<<(ostream& out, Uuid uuid)
Modified: qpid/branches/QPID-2519/cpp/src/qpid/types/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/types/Variant.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/types/Variant.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/types/Variant.cpp Fri Oct 21 14:42:12 2011
@@ -19,7 +19,6 @@
*
*/
#include "qpid/types/Variant.h"
-#include "qpid/Msg.h"
#include "qpid/log/Statement.h"
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
@@ -108,15 +107,27 @@ class VariantImpl
} value;
std::string encoding;//optional encoding for variable length data
- std::string getTypeName(VariantType type) const;
template<class T> T convertFromString() const
{
std::string* s = reinterpret_cast<std::string*>(value.v);
- try {
- return boost::lexical_cast<T>(*s);
- } catch(const boost::bad_lexical_cast&) {
- throw InvalidConversion(QPID_MSG("Cannot convert " << *s));
+ if (std::numeric_limits<T>::is_signed || s->find('-') != 0) {
+ //lexical_cast won't fail if string is a negative number and T is unsigned
+ try {
+ return boost::lexical_cast<T>(*s);
+ } catch(const boost::bad_lexical_cast&) {
+ //don't return, throw exception below
+ }
+ } else {
+ //T is unsigned and number starts with '-'
+ try {
+ //handle special case of negative zero
+ if (boost::lexical_cast<int>(*s) == 0) return 0;
+ //else its a non-zero negative number so throw exception at end of function
+ } catch(const boost::bad_lexical_cast&) {
+ //wasn't a valid int, therefore not a valid uint
+ }
}
+ throw InvalidConversion(QPID_MSG("Cannot convert " << *s));
}
};
@@ -370,11 +381,11 @@ int8_t VariantImpl::asInt8() const
return int8_t(value.ui16);
break;
case VAR_UINT32:
- if (value.ui32 <= (uint) std::numeric_limits<int8_t>::max())
+ if (value.ui32 <= (uint32_t) std::numeric_limits<int8_t>::max())
return int8_t(value.ui32);
break;
case VAR_UINT64:
- if (value.ui64 <= (uint) std::numeric_limits<int8_t>::max())
+ if (value.ui64 <= (uint64_t) std::numeric_limits<int8_t>::max())
return int8_t(value.ui64);
break;
case VAR_STRING: return convertFromString<int8_t>();
@@ -401,11 +412,11 @@ int16_t VariantImpl::asInt16() const
return int16_t(value.ui16);
break;
case VAR_UINT32:
- if (value.ui32 <= (uint) std::numeric_limits<int16_t>::max())
+ if (value.ui32 <= (uint32_t) std::numeric_limits<int16_t>::max())
return int16_t(value.ui32);
break;
case VAR_UINT64:
- if (value.ui64 <= (uint) std::numeric_limits<int16_t>::max())
+ if (value.ui64 <= (uint64_t) std::numeric_limits<int16_t>::max())
return int16_t(value.ui64);
break;
case VAR_STRING: return convertFromString<int16_t>();
@@ -430,7 +441,7 @@ int32_t VariantImpl::asInt32() const
return int32_t(value.ui32);
break;
case VAR_UINT64:
- if (value.ui64 <= (uint32_t) std::numeric_limits<int32_t>::max())
+ if (value.ui64 <= (uint64_t) std::numeric_limits<int32_t>::max())
return int32_t(value.ui64);
break;
case VAR_STRING: return convertFromString<int32_t>();
@@ -582,7 +593,7 @@ const std::string& VariantImpl::getStrin
void VariantImpl::setEncoding(const std::string& s) { encoding = s; }
const std::string& VariantImpl::getEncoding() const { return encoding; }
-std::string VariantImpl::getTypeName(VariantType type) const
+std::string getTypeName(VariantType type)
{
switch (type) {
case VAR_VOID: return "void";
Modified: qpid/branches/QPID-2519/cpp/src/replication.mk
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/replication.mk?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/replication.mk (original)
+++ qpid/branches/QPID-2519/cpp/src/replication.mk Fri Oct 21 14:42:12 2011
@@ -19,14 +19,14 @@
# Make file for building two plugins for asynchronously replicating
# queues.
-dmodule_LTLIBRARIES += replicating_listener.la replication_exchange.la
+dmoduleexec_LTLIBRARIES += replicating_listener.la replication_exchange.la
# a queue event listener plugin that creates messages on a replication
# queue corresponding to enqueue and dequeue events:
replicating_listener_la_SOURCES = \
qpid/replication/constants.h \
qpid/replication/ReplicatingEventListener.cpp \
- qpid/replication/ReplicatingEventListener.h
+ qpid/replication/ReplicatingEventListener.h
replicating_listener_la_LIBADD = libqpidbroker.la
if SUNOS
@@ -41,7 +41,7 @@ replicating_listener_la_LDFLAGS = $(PLUG
replication_exchange_la_SOURCES = \
qpid/replication/constants.h \
qpid/replication/ReplicationExchange.cpp \
- qpid/replication/ReplicationExchange.h
+ qpid/replication/ReplicationExchange.h
replication_exchange_la_LIBADD = libqpidbroker.la
Modified: qpid/branches/QPID-2519/cpp/src/ssl.mk
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/ssl.mk?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/ssl.mk (original)
+++ qpid/branches/QPID-2519/cpp/src/ssl.mk Fri Oct 21 14:42:12 2011
@@ -18,7 +18,7 @@
#
#
# Makefile fragment, conditionally included in Makefile.am
-#
+#
libsslcommon_la_SOURCES = \
qpid/sys/ssl/check.h \
qpid/sys/ssl/check.cpp \
@@ -47,7 +47,7 @@ ssl_la_CXXFLAGS=$(AM_CXXFLAGS) $(SSL_CFL
ssl_la_LDFLAGS = $(PLUGINLDFLAGS)
-dmodule_LTLIBRARIES += ssl.la
+dmoduleexec_LTLIBRARIES += ssl.la
sslconnector_la_SOURCES = \
qpid/client/SslConnector.cpp
@@ -60,5 +60,5 @@ sslconnector_la_CXXFLAGS = $(AM_CXXFLAGS
sslconnector_la_LDFLAGS = $(PLUGINLDFLAGS)
-cmodule_LTLIBRARIES += \
+cmoduleexec_LTLIBRARIES += \
sslconnector.la
Modified: qpid/branches/QPID-2519/cpp/src/tests/.valgrind.supp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/.valgrind.supp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/.valgrind.supp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/.valgrind.supp Fri Oct 21 14:42:12 2011
@@ -73,61 +73,6 @@
}
{
- boost 103200 -- we think Boost is responsible for these leaks.
- Memcheck:Leak
- fun:_Znwm
- fun:_ZN5boost15program_options??options_description*
-}
-
-{
- boost 103200 -- we think Boost is responsible for these leaks.
- Memcheck:Leak
- fun:_Znwm
- fun:_ZN5boost9unit_test9test_case*
-}
-
-{
- boost 103200 -- we think Boost is responsible for these leaks.
- Memcheck:Leak
- fun:calloc
- fun:_dlerror_run
- fun:dlopen@@GLIBC_2.2.5
- fun:_ZN4qpid3sys5Shlib4loadEPKc
- fun:_Z9testShlibv
- fun:_ZN5boost9unit_test9ut_detail17unit_test_monitor8functionEv
- obj:/usr/lib64/libboost_unit_test_framework.so.1.32.0
- fun:_ZN5boost17execution_monitor7executeEbi
- fun:_ZN5boost9unit_test9ut_detail17unit_test_monitor21execute_and_translateEPNS0_9test_caseEMS3_FvvEi
- fun:_ZN5boost9unit_test9test_case3runEv
- fun:_ZN5boost9unit_test10test_suite6do_runEv
- fun:_ZN5boost9unit_test9test_case3runEv
- fun:main
-}
-
-{
- boost 103200 -- we think Boost is responsible for these leaks.
- Memcheck:Leak
- fun:calloc
- fun:_dl_allocate_tls
- fun:pthread_create@@GLIBC_2.2.5
- fun:_ZN4qpid6broker5Timer5startEv
- fun:_ZN4qpid6broker5TimerC1Ev
- fun:_ZN4qpid6broker10DtxManagerC1Ev
- fun:_ZN4qpid6broker6BrokerC1ERKNS1_7OptionsE
- fun:_ZN4qpid6broker6Broker6createERKNS1_7OptionsE
- fun:_ZN15SessionFixtureTI15ProxyConnectionEC2Ev
- fun:_Z14testQueueQueryv
- fun:_ZN5boost9unit_test9ut_detail17unit_test_monitor8functionEv
- obj:/usr/lib64/libboost_unit_test_framework.so.1.32.0
- fun:_ZN5boost17execution_monitor7executeEbi
- fun:_ZN5boost9unit_test9ut_detail17unit_test_monitor21execute_and_translateEPNS0_9test_caseEMS3_FvvEi
- fun:_ZN5boost9unit_test9test_case3runEv
- fun:_ZN5boost9unit_test10test_suite6do_runEv
- fun:_ZN5boost9unit_test9test_case3runEv
- fun:main
-}
-
-{
INVESTIGATE
Memcheck:Leak
fun:calloc
@@ -155,25 +100,6 @@
}
{
- boost 103200 -- mgoulish -- fix this, sometime
- Memcheck:Leak
- fun:*
- fun:*
- obj:*
- fun:*
- fun:_ZN4qpid34options_description_less_easy_initclEPKcPKN5boost15program_options14value_semanticES2_
-}
-
-{
- boost 103200 -- mgoulish -- fix this, sometime
- Memcheck:Leak
- fun:*
- fun:*
- fun:*
- fun:_ZN4qpid34options_description_less_easy_initclEPKcPKN5boost15program_options14value_semanticES2_
-}
-
-{
INVESTIGATE
Memcheck:Param
socketcall.sendto(msg)
Modified: qpid/branches/QPID-2519/cpp/src/tests/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/Address.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/Address.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/Address.cpp Fri Oct 21 14:42:12 2011
@@ -119,6 +119,17 @@ QPID_AUTO_TEST_CASE(testParseQuotedNameA
BOOST_CHECK_EQUAL(std::string("my subject with ; in it"), address.getSubject());
}
+QPID_AUTO_TEST_CASE(testParseOptionsWithEmptyStringAsValue)
+{
+ Address address("my-topic; {a:'', x:101}");
+ BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+ Variant a = address.getOptions()["a"];
+ BOOST_CHECK_EQUAL(VAR_STRING, a.getType());
+ std::string aVal = a;
+ BOOST_CHECK(aVal.size() == 0);
+ BOOST_CHECK_EQUAL((uint16_t) 101, address.getOptions()["x"].asInt64());
+}
+
QPID_AUTO_TEST_SUITE_END()
}}
Modified: qpid/branches/QPID-2519/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/BrokerFixture.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/BrokerFixture.h (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/BrokerFixture.h Fri Oct 21 14:42:12 2011
@@ -22,8 +22,6 @@
*
*/
-#include "SocketProxy.h"
-
#include "qpid/broker/Broker.h"
#include "qpid/client/Connection.h"
#include "qpid/client/ConnectionImpl.h"
@@ -71,16 +69,15 @@ struct BrokerFixture : private boost::n
brokerThread = qpid::sys::Thread(*broker);
};
- void shutdownBroker()
- {
- broker->shutdown();
- broker = BrokerPtr();
+ void shutdownBroker() {
+ if (broker) {
+ broker->shutdown();
+ brokerThread.join();
+ broker = BrokerPtr();
+ }
}
- ~BrokerFixture() {
- if (broker) broker->shutdown();
- brokerThread.join();
- }
+ ~BrokerFixture() { shutdownBroker(); }
/** Open a connection to the broker. */
void open(qpid::client::Connection& c) {
@@ -97,20 +94,6 @@ struct LocalConnection : public qpid::cl
~LocalConnection() { close(); }
};
-/** A local client connection via a socket proxy. */
-struct ProxyConnection : public qpid::client::Connection {
- SocketProxy proxy;
- ProxyConnection(int brokerPort) : proxy(brokerPort) {
- open("localhost", proxy.getPort());
- }
- ProxyConnection(const qpid::client::ConnectionSettings& s) : proxy(s.port) {
- qpid::client::ConnectionSettings proxySettings(s);
- proxySettings.port = proxy.getPort();
- open(proxySettings);
- }
- ~ProxyConnection() { close(); }
-};
-
/** Convenience class to create and open a connection and session
* and some related useful objects.
*/
@@ -147,7 +130,6 @@ struct SessionFixtureT : BrokerFixture,
};
typedef SessionFixtureT<LocalConnection> SessionFixture;
-typedef SessionFixtureT<ProxyConnection> ProxySessionFixture;
}} // namespace qpid::tests
Modified: qpid/branches/QPID-2519/cpp/src/tests/BrokerMgmtAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/BrokerMgmtAgent.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/BrokerMgmtAgent.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/BrokerMgmtAgent.cpp Fri Oct 21 14:42:12 2011
@@ -599,13 +599,12 @@ namespace qpid {
// populate the agent with multiple test objects
const size_t objCount = 50;
std::vector<TestManageable *> tmv;
- uint32_t objLen;
for (size_t i = 0; i < objCount; i++) {
std::stringstream key;
key << "testobj-" << i;
TestManageable *tm = new TestManageable(agent, key.str());
- objLen = tm->GetManagementObject()->writePropertiesSize();
+ (void) tm->GetManagementObject()->writePropertiesSize();
agent->addObject(tm->GetManagementObject(), key.str());
tmv.push_back(tm);
}
Modified: qpid/branches/QPID-2519/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/CMakeLists.txt?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/CMakeLists.txt Fri Oct 21 14:42:12 2011
@@ -107,7 +107,6 @@ set(unit_tests_to_build
MessagingSessionTests
SequenceSet
StringUtils
- IncompleteMessageList
RangeSet
AtomicValue
QueueTest
@@ -119,6 +118,7 @@ set(unit_tests_to_build
MessageTest
QueueRegistryTest
QueuePolicyTest
+ QueueFlowLimitTest
FramingTest
HeaderTest
SequenceNumberTest
@@ -264,6 +264,19 @@ add_executable (qpid-send qpid-send.cpp
target_link_libraries (qpid-send qpidmessaging)
remember_location(qpid-send)
+add_executable (qpid-ping qpid-ping.cpp ${platform_test_additions})
+target_link_libraries (qpid-ping qpidclient)
+remember_location(qpid-ping)
+
+add_executable (datagen datagen.cpp ${platform_test_additions})
+target_link_libraries (datagen qpidclient)
+remember_location(datagen)
+
+add_executable (msg_group_test msg_group_test.cpp ${platform_test_additions})
+target_link_libraries (msg_group_test qpidmessaging)
+remember_location(msg_group_test)
+
+
# qpid-perftest and qpid-latency-test are generally useful so install them
install (TARGETS qpid-perftest qpid-latency-test RUNTIME
DESTINATION ${QPID_INSTALL_BINDIR})
@@ -278,7 +291,7 @@ set(test_wrap ${shell} ${CMAKE_CURRENT_S
add_test (unit_test ${test_wrap} ${unit_test_LOCATION})
add_test (start_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/start_broker${test_script_suffix})
-add_test (qpid-client-test ${test_wrap} ${qpid-client_test_LOCATION})
+add_test (qpid-client-test ${test_wrap} ${qpid-client-test_LOCATION})
add_test (quick_perftest ${test_wrap} ${qpid-perftest_LOCATION} --summary --count 100)
add_test (quick_topictest ${test_wrap} ${CMAKE_CURRENT_SOURCE_DIR}/quick_topictest${test_script_suffix})
add_test (quick_txtest ${test_wrap} ${qpid-txtest_LOCATION} --queues 4 --tx-count 10 --quiet)
@@ -288,6 +301,7 @@ if (PYTHON_EXECUTABLE)
endif (PYTHON_EXECUTABLE)
add_test (stop_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/stop_broker${test_script_suffix})
if (PYTHON_EXECUTABLE)
+ add_test (ipv6_test ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix})
add_test (federation_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_federation_tests${test_script_suffix})
if (BUILD_ACL)
add_test (acl_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_acl_tests${test_script_suffix})
Modified: qpid/branches/QPID-2519/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/ClientSessionTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/ClientSessionTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/ClientSessionTest.cpp Fri Oct 21 14:42:12 2011
@@ -102,9 +102,9 @@ struct SimpleListener : public MessageLi
}
};
-struct ClientSessionFixture : public ProxySessionFixture
+struct ClientSessionFixture : public SessionFixture
{
- ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {
+ ClientSessionFixture(Broker::Options opts = Broker::Options()) : SessionFixture(opts) {
session.queueDeclare(arg::queue="my-queue");
}
};
@@ -150,16 +150,6 @@ QPID_AUTO_TEST_CASE(testDispatcherThread
BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData());
}
-// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented.
-void testSuspend0Timeout() {
- ClientSessionFixture fix;
- fix.session.suspend(); // session has 0 timeout.
- try {
- fix.connection.resume(fix.session);
- BOOST_FAIL("Expected InvalidArgumentException.");
- } catch(const InternalErrorException&) {}
-}
-
QPID_AUTO_TEST_CASE(testUseSuspendedError)
{
ClientSessionFixture fix;
@@ -171,18 +161,6 @@ QPID_AUTO_TEST_CASE(testUseSuspendedErro
} catch(const NotAttachedException&) {}
}
-// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented.
-void testSuspendResume() {
- ClientSessionFixture fix;
- fix.session.timeout(60);
- fix.session.suspend();
- // Make sure we are still subscribed after resume.
- fix.connection.resume(fix.session);
- fix.session.messageTransfer(arg::content=Message("my-message", "my-queue"));
- BOOST_CHECK_EQUAL("my-message", fix.subs.get("my-queue", TIME_SEC).getData());
-}
-
-
QPID_AUTO_TEST_CASE(testSendToSelf) {
ClientSessionFixture fix;
SimpleListener mylistener;
@@ -271,8 +249,12 @@ QPID_AUTO_TEST_CASE(testOpenFailure) {
QPID_AUTO_TEST_CASE(testPeriodicExpiration) {
Broker::Options opts;
opts.queueCleanInterval = 1;
+ opts.queueFlowStopRatio = 0;
+ opts.queueFlowResumeRatio = 0;
ClientSessionFixture fix(opts);
- fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
+ FieldTable args;
+ args.setInt("qpid.max_count",10);
+ fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
for (uint i = 0; i < 10; i++) {
Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
@@ -283,6 +265,7 @@ QPID_AUTO_TEST_CASE(testPeriodicExpirati
BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u);
qpid::sys::sleep(2);
BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u);
+ fix.session.messageTransfer(arg::content=Message("Message_11", "my-queue"));//ensure policy is also updated
}
QPID_AUTO_TEST_CASE(testExpirationOnPop) {
Modified: qpid/branches/QPID-2519/cpp/src/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/ExchangeTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/ExchangeTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/ExchangeTest.cpp Fri Oct 21 14:42:12 2011
@@ -253,7 +253,7 @@ QPID_AUTO_TEST_CASE(testIVEOption)
TopicExchange topic ("topic1", false, args);
intrusive_ptr<Message> msg1 = cmessage("direct1", "abc");
- msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("a", "abc");
+ msg1->insertCustomProperty("a", "abc");
DeliverableMessage dmsg1(msg1);
FieldTable args2;
Modified: qpid/branches/QPID-2519/cpp/src/tests/ForkedBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/ForkedBroker.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/ForkedBroker.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/ForkedBroker.cpp Fri Oct 21 14:42:12 2011
@@ -68,8 +68,7 @@ ForkedBroker::~ForkedBroker() {
}
if (!dataDir.empty())
{
- int unused_ret; // Suppress warnings about ignoring return value.
- unused_ret = ::system(("rm -rf "+dataDir).c_str());
+ (void) ::system(("rm -rf "+dataDir).c_str());
}
}
Modified: qpid/branches/QPID-2519/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/Makefile.am?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/Makefile.am Fri Oct 21 14:42:12 2011
@@ -75,7 +75,7 @@ unit_test_SOURCES= unit_test.cpp unit_te
MessagingThreadTests.cpp \
MessagingFixture.h \
ClientSessionTest.cpp \
- BrokerFixture.h SocketProxy.h \
+ BrokerFixture.h \
exception_test.cpp \
RefCounted.cpp \
SessionState.cpp logging.cpp \
@@ -87,7 +87,6 @@ unit_test_SOURCES= unit_test.cpp unit_te
InlineVector.cpp \
SequenceSet.cpp \
StringUtils.cpp \
- IncompleteMessageList.cpp \
RangeSet.cpp \
AtomicValue.cpp \
QueueTest.cpp \
@@ -99,6 +98,7 @@ unit_test_SOURCES= unit_test.cpp unit_te
MessageTest.cpp \
QueueRegistryTest.cpp \
QueuePolicyTest.cpp \
+ QueueFlowLimitTest.cpp \
FramingTest.cpp \
HeaderTest.cpp \
SequenceNumberTest.cpp \
@@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_te
Variant.cpp \
Address.cpp \
ClientMessage.cpp \
- Qmf2.cpp
+ Qmf2.cpp \
+ BrokerOptions.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -286,31 +287,27 @@ check_PROGRAMS+=datagen
datagen_SOURCES=datagen.cpp
datagen_LDADD=$(lib_common) $(lib_client)
-check_PROGRAMS+=qrsh_server
-qrsh_server_SOURCES=qrsh_server.cpp
-qrsh_server_LDADD=$(lib_client)
-
-check_PROGRAMS+=qrsh_run
-qrsh_run_SOURCES=qrsh_run.cpp
-qrsh_run_LDADD=$(lib_client)
-
-check_PROGRAMS+=qrsh
-qrsh_SOURCES=qrsh.cpp
-qrsh_LDADD=$(lib_client)
-
check_PROGRAMS+=qpid-stream
qpid_stream_INCLUDES=$(PUBLIC_INCLUDES)
qpid_stream_SOURCES=qpid-stream.cpp
qpid_stream_LDADD=$(lib_messaging)
+check_PROGRAMS+=msg_group_test
+msg_group_test_INCLUDES=$(PUBLIC_INCLUDES)
+msg_group_test_SOURCES=msg_group_test.cpp
+msg_group_test_LDADD=$(lib_messaging)
+
TESTS_ENVIRONMENT = \
VALGRIND=$(VALGRIND) \
LIBTOOL="$(LIBTOOL)" \
QPID_DATA_DIR= \
$(srcdir)/run_test
-system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest
-TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test dynamic_log_level_test
+system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \
+ run_msg_group_tests
+TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_federation_sys_tests \
+ run_acl_tests run_cli_tests replication_test dynamic_log_level_test \
+ run_queue_flow_limit_tests ipv6_test
EXTRA_DIST += \
run_test vg_check \
@@ -325,6 +322,8 @@ EXTRA_DIST += \
config.null \
ais_check \
run_federation_tests \
+ run_federation_sys_tests \
+ run_long_federation_sys_tests \
run_cli_tests \
run_acl_tests \
.valgrind.supp \
@@ -349,7 +348,10 @@ EXTRA_DIST += \
run_test.ps1 \
start_broker.ps1 \
stop_broker.ps1 \
- topictest.ps1
+ topictest.ps1 \
+ run_queue_flow_limit_tests \
+ run_msg_group_tests \
+ ipv6_test
check_LTLIBRARIES += libdlclose_noop.la
libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
@@ -360,7 +362,11 @@ CLEANFILES+=valgrind.out *.log *.vglog*
# Longer running stability tests, not run by default check: target.
# Not run under valgrind, too slow
-LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \
+LONG_TESTS+=start_broker \
+ fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \
+ run_msg_group_tests_soak \
+ stop_broker \
+ run_long_federation_sys_tests \
run_failover_soak reliable_replication_test \
federated_cluster_test_with_node_failure
@@ -372,7 +378,8 @@ EXTRA_DIST+= \
run_failover_soak \
reliable_replication_test \
federated_cluster_test_with_node_failure \
- sasl_test_setup.sh
+ sasl_test_setup.sh \
+ run_msg_group_tests_soak
check-long:
$(MAKE) check TESTS="$(LONG_TESTS)" VALGRIND=
Modified: qpid/branches/QPID-2519/cpp/src/tests/MessageReplayTracker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/MessageReplayTracker.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/MessageReplayTracker.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/MessageReplayTracker.cpp Fri Oct 21 14:42:12 2011
@@ -51,7 +51,7 @@ class ReplayBufferChecker
QPID_AUTO_TEST_CASE(testReplay)
{
- ProxySessionFixture fix;
+ SessionFixture fix;
fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
MessageReplayTracker tracker(10);
@@ -77,7 +77,7 @@ QPID_AUTO_TEST_CASE(testReplay)
QPID_AUTO_TEST_CASE(testCheckCompletion)
{
- ProxySessionFixture fix;
+ SessionFixture fix;
fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
MessageReplayTracker tracker(10);
Modified: qpid/branches/QPID-2519/cpp/src/tests/MessagingFixture.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/MessagingFixture.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/MessagingFixture.h (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/MessagingFixture.h Fri Oct 21 14:42:12 2011
@@ -27,15 +27,19 @@
#include "qpid/client/Connection.h"
#include "qpid/client/Session.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/messaging/Address.h"
#include "qpid/messaging/Connection.h"
#include "qpid/messaging/Session.h"
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Message.h"
+#include "qpid/types/Variant.h"
namespace qpid {
namespace tests {
+using qpid::types::Variant;
+
struct BrokerAdmin
{
qpid::client::Connection connection;
@@ -223,6 +227,119 @@ inline void receive(messaging::Receiver&
}
}
+
+class MethodInvoker
+{
+ public:
+ MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
+ sender(session.createSender("qmf.default.direct/broker")),
+ receiver(session.createReceiver(replyTo)) {}
+
+ void createExchange(const std::string& name, const std::string& type, bool durable=false)
+ {
+ Variant::Map params;
+ params["name"]=name;
+ params["type"]="exchange";
+ params["properties"] = Variant::Map();
+ params["properties"].asMap()["exchange-type"] = type;
+ params["properties"].asMap()["durable"] = durable;
+ methodRequest("create", params);
+ }
+
+ void deleteExchange(const std::string& name)
+ {
+ Variant::Map params;
+ params["name"]=name;
+ params["type"]="exchange";
+ methodRequest("delete", params);
+ }
+
+ void createQueue(const std::string& name, bool durable=false, bool autodelete=false,
+ const Variant::Map& options=Variant::Map())
+ {
+ Variant::Map params;
+ params["name"]=name;
+ params["type"]="queue";
+ params["properties"] = options;
+ params["properties"].asMap()["durable"] = durable;
+ params["properties"].asMap()["auto-delete"] = autodelete;
+ methodRequest("create", params);
+ }
+
+ void deleteQueue(const std::string& name)
+ {
+ Variant::Map params;
+ params["name"]=name;
+ params["type"]="queue";
+ methodRequest("delete", params);
+ }
+
+ void bind(const std::string& exchange, const std::string& queue, const std::string& key,
+ const Variant::Map& options=Variant::Map())
+ {
+ Variant::Map params;
+ params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str();
+ params["type"]="binding";
+ params["properties"] = options;
+ methodRequest("create", params);
+ }
+
+ void unbind(const std::string& exchange, const std::string& queue, const std::string& key)
+ {
+ Variant::Map params;
+ params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str();
+ params["type"]="binding";
+ methodRequest("delete", params);
+ }
+
+ void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0)
+ {
+ Variant::Map content;
+ Variant::Map objectId;
+ objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
+ content["_object_id"] = objectId;
+ content["_method_name"] = method;
+ content["_arguments"] = inParams;
+
+ messaging::Message request;
+ request.setReplyTo(replyTo);
+ request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
+ request.getProperties()["qmf.opcode"] = "_method_request";
+ encode(content, request);
+
+ sender.send(request);
+
+ messaging::Message response;
+ if (receiver.fetch(response, messaging::Duration::SECOND*5)) {
+ if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
+ std::string opcode = response.getProperties()["qmf.opcode"];
+ if (opcode == "_method_response") {
+ if (outParams) {
+ Variant::Map m;
+ decode(response, m);
+ *outParams = m["_arguments"].asMap();
+ }
+ } else if (opcode == "_exception") {
+ Variant::Map m;
+ decode(response, m);
+ throw Exception(QPID_MSG("Error: " << m["_values"]));
+ } else {
+ throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode));
+ }
+ } else {
+ throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id="
+ << response.getProperties()["x-amqp-0-10.app-id"]));
+ }
+ } else {
+ throw Exception(QPID_MSG("No response received"));
+ }
+ }
+ private:
+ messaging::Address replyTo;
+ messaging::Sender sender;
+ messaging::Receiver receiver;
+};
+
}} // namespace qpid::tests
#endif /*!TESTS_MESSAGINGFIXTURE_H*/
Modified: qpid/branches/QPID-2519/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/MessagingSessionTests.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/MessagingSessionTests.cpp Fri Oct 21 14:42:12 2011
@@ -611,6 +611,28 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueu
fix.admin.deleteQueue("q");
}
+QPID_AUTO_TEST_CASE(testAssertExchangeOption)
+{
+ MessagingFixture fix;
+ std::string a1 = "e; {create:always, assert:always, node:{type:topic, x-declare:{type:direct, arguments:{qpid.msg_sequence:True}}}}";
+ Sender s1 = fix.session.createSender(a1);
+ s1.close();
+ Receiver r1 = fix.session.createReceiver(a1);
+ r1.close();
+
+ std::string a2 = "e; {assert:receiver, node:{type:topic, x-declare:{type:fanout, arguments:{qpid.msg_sequence:True}}}}";
+ Sender s2 = fix.session.createSender(a2);
+ s2.close();
+ BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::AssertionFailed);
+
+ std::string a3 = "e; {assert:sender, node:{x-declare:{arguments:{qpid.msg_sequence:False}}}}";
+ BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::AssertionFailed);
+ Receiver r3 = fix.session.createReceiver(a3);
+ r3.close();
+
+ fix.admin.deleteExchange("e");
+}
+
QPID_AUTO_TEST_CASE(testGetSender)
{
QueueFixture fix;
@@ -890,6 +912,212 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE));
}
+QPID_AUTO_TEST_CASE(testQmfCreateAndDelete)
+{
+ MessagingFixture fix(Broker::Options(), true/*enable management*/);
+ MethodInvoker control(fix.session);
+ control.createQueue("my-queue");
+ control.createExchange("my-exchange", "topic");
+ control.bind("my-exchange", "my-queue", "subject1");
+
+ Sender sender = fix.session.createSender("my-exchange");
+ Receiver receiver = fix.session.createReceiver("my-queue");
+ Message out;
+ out.setSubject("subject1");
+ out.setContent("one");
+ sender.send(out);
+ Message in;
+ BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5));
+ BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+ control.unbind("my-exchange", "my-queue", "subject1");
+ control.bind("my-exchange", "my-queue", "subject2");
+
+ out.setContent("two");
+ sender.send(out);//should be dropped
+
+ out.setSubject("subject2");
+ out.setContent("three");
+ sender.send(out);//should not be dropped
+
+ BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5));
+ BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+ BOOST_CHECK(!receiver.fetch(in, Duration::IMMEDIATE));
+ sender.close();
+ receiver.close();
+
+ control.deleteExchange("my-exchange");
+ messaging::Session other = fix.connection.createSession();
+ {
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(other.createSender("my-exchange"), qpid::messaging::NotFound);
+ }
+ control.deleteQueue("my-queue");
+ other = fix.connection.createSession();
+ {
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(other.createReceiver("my-queue"), qpid::messaging::NotFound);
+ }
+}
+
+QPID_AUTO_TEST_CASE(testRejectAndCredit)
+{
+ //Ensure credit is restored on completing rejected messages
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+
+ const uint count(10);
+ receiver.setCapacity(count);
+ for (uint i = 0; i < count; i++) {
+ sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+ }
+
+ Message in;
+ for (uint i = 0; i < count; ++i) {
+ if (receiver.fetch(in, Duration::SECOND)) {
+ BOOST_CHECK_EQUAL(in.getContent(), (boost::format("Message_%1%") % (i+1)).str());
+ fix.session.reject(in);
+ } else {
+ BOOST_FAIL((boost::format("Message_%1% not received as expected") % (i+1)).str());
+ break;
+ }
+ }
+ //send another batch of messages
+ for (uint i = 0; i < count; i++) {
+ sender.send(Message((boost::format("Message_%1%") % (i+count)).str()));
+ }
+
+ for (uint i = 0; i < count; ++i) {
+ if (receiver.fetch(in, Duration::SECOND)) {
+ BOOST_CHECK_EQUAL(in.getContent(), (boost::format("Message_%1%") % (i+count)).str());
+ } else {
+ BOOST_FAIL((boost::format("Message_%1% not received as expected") % (i+count)).str());
+ break;
+ }
+ }
+ fix.session.acknowledge();
+ receiver.close();
+ sender.close();
+}
+
+QPID_AUTO_TEST_CASE(testTtlForever)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ Message out("I want to live forever!");
+ out.setTtl(Duration::FOREVER);
+ sender.send(out, true);
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Message in = receiver.fetch(Duration::IMMEDIATE);
+ fix.session.acknowledge();
+ BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+ BOOST_CHECK(in.getTtl() == Duration::FOREVER);
+}
+
+QPID_AUTO_TEST_CASE(testExclusiveTopicSubscriber)
+{
+ TopicFixture fix;
+ std::string address = (boost::format("%1%; { link: { name: 'my-subscription', x-declare: { auto-delete: true, exclusive: true }}}") % fix.topic).str();
+ Sender sender = fix.session.createSender(fix.topic);
+ Receiver receiver1 = fix.session.createReceiver(address);
+ {
+ ScopedSuppressLogging sl;
+ try {
+ fix.session.createReceiver(address);
+ fix.session.sync();
+ BOOST_FAIL("Expected exception.");
+ } catch (const MessagingException& /*e*/) {}
+ }
+}
+
+QPID_AUTO_TEST_CASE(testNonExclusiveSubscriber)
+{
+ TopicFixture fix;
+ std::string address = (boost::format("%1%; {node:{type:topic}, link:{name:'my-subscription', x-declare:{auto-delete:true, exclusive:false}}}") % fix.topic).str();
+ Receiver receiver1 = fix.session.createReceiver(address);
+ Receiver receiver2 = fix.session.createReceiver(address);
+ Sender sender = fix.session.createSender(fix.topic);
+ sender.send(Message("one"), true);
+ Message in = receiver1.fetch(Duration::IMMEDIATE);
+ BOOST_CHECK_EQUAL(in.getContent(), std::string("one"));
+ sender.send(Message("two"), true);
+ in = receiver2.fetch(Duration::IMMEDIATE);
+ BOOST_CHECK_EQUAL(in.getContent(), std::string("two"));
+ fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testAcknowledgeUpTo)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ const uint count(20);
+ for (uint i = 0; i < count; ++i) {
+ sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+ }
+
+ Session other = fix.connection.createSession();
+ Receiver receiver = other.createReceiver(fix.queue);
+ std::vector<Message> messages;
+ for (uint i = 0; i < count; ++i) {
+ Message msg = receiver.fetch();
+ BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str());
+ messages.push_back(msg);
+ }
+ const uint batch = 10;
+ other.acknowledgeUpTo(messages[batch-1]);//acknowledge first 10 messages only
+
+ messages.clear();
+ other.sync();
+ other.close();
+
+ other = fix.connection.createSession();
+ receiver = other.createReceiver(fix.queue);
+ Message msg;
+ for (uint i = 0; i < (count-batch); ++i) {
+ msg = receiver.fetch();
+ BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str());
+ }
+ other.acknowledgeUpTo(msg);
+ other.sync();
+ other.close();
+
+ Message m;
+ //check queue is empty
+ BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE));
+}
+
+QPID_AUTO_TEST_CASE(testCreateBindingsOnStandardExchange)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender((boost::format("amq.direct; {create:always, node:{type:topic, x-bindings:[{queue:%1%, key:my-subject}]}}") % fix.queue).str());
+ Message out("test-message");
+ out.setSubject("my-subject");
+ sender.send(out);
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Message in = receiver.fetch(Duration::SECOND * 5);
+ fix.session.acknowledge();
+ BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+ BOOST_CHECK_EQUAL(in.getSubject(), out.getSubject());
+}
+
+QPID_AUTO_TEST_CASE(testUnsubscribeOnClose)
+{
+ MessagingFixture fix;
+ Sender sender = fix.session.createSender("my-exchange/my-subject; {create: always, delete:sender, node:{type:topic, x-declare:{alternate-exchange:amq.fanout}}}");
+ Receiver receiver = fix.session.createReceiver("my-exchange/my-subject");
+ Receiver deadletters = fix.session.createReceiver("amq.fanout");
+
+ sender.send(Message("first"));
+ Message in = receiver.fetch(Duration::SECOND);
+ BOOST_CHECK_EQUAL(in.getContent(), std::string("first"));
+ fix.session.acknowledge();
+ receiver.close();
+ sender.send(Message("second"));
+ in = deadletters.fetch(Duration::SECOND);
+ BOOST_CHECK_EQUAL(in.getContent(), std::string("second"));
+ fix.session.acknowledge();
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
Modified: qpid/branches/QPID-2519/cpp/src/tests/Qmf2.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/Qmf2.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/Qmf2.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/Qmf2.cpp Fri Oct 21 14:42:12 2011
@@ -23,12 +23,36 @@
#include "qmf/QueryImpl.h"
#include "qmf/SchemaImpl.h"
#include "qmf/exceptions.h"
-
+#include "qpid/messaging/Connection.h"
+#include "qmf/PosixEventNotifierImpl.h"
+#include "qmf/AgentSession.h"
+#include "qmf/AgentSessionImpl.h"
+#include "qmf/ConsoleSession.h"
+#include "qmf/ConsoleSessionImpl.h"
#include "unit_test.h"
+using namespace std;
using namespace qpid::types;
+using namespace qpid::messaging;
using namespace qmf;
+bool isReadable(int fd)
+{
+ fd_set rfds;
+ struct timeval tv;
+ int nfds, result;
+
+ FD_ZERO(&rfds);
+ FD_SET(fd, &rfds);
+ nfds = fd + 1;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+
+ result = select(nfds, &rfds, NULL, NULL, &tv);
+
+ return result > 0;
+}
+
namespace qpid {
namespace tests {
@@ -315,6 +339,84 @@ QPID_AUTO_TEST_CASE(testSchema)
BOOST_CHECK_THROW(method.getArgument(3), QmfException);
}
+QPID_AUTO_TEST_CASE(testAgentSessionEventListener)
+{
+ Connection connection("localhost");
+ AgentSession session(connection, "");
+ posix::EventNotifier notifier(session);
+
+ AgentSessionImpl& sessionImpl = AgentSessionImplAccess::get(session);
+
+ BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+}
+
+QPID_AUTO_TEST_CASE(testConsoleSessionEventListener)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+
+ ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session);
+
+ BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+}
+
+QPID_AUTO_TEST_CASE(testGetHandle)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+
+ BOOST_CHECK(notifier.getHandle() > 0);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadableToFalse)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+ PosixEventNotifierImplAccess::get(notifier).setReadable(false);
+
+ bool readable(isReadable(notifier.getHandle()));
+ BOOST_CHECK(!readable);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadable)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+ PosixEventNotifierImplAccess::get(notifier).setReadable(true);
+
+ bool readable(isReadable(notifier.getHandle()));
+ BOOST_CHECK(readable);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadableMultiple)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+ for (int i = 0; i < 15; i++)
+ PosixEventNotifierImplAccess::get(notifier).setReadable(true);
+ PosixEventNotifierImplAccess::get(notifier).setReadable(false);
+
+ bool readable(isReadable(notifier.getHandle()));
+ BOOST_CHECK(!readable);
+}
+
+QPID_AUTO_TEST_CASE(testDeleteNotifier)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session);
+ {
+ posix::EventNotifier notifier(session);
+ BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+ }
+ BOOST_CHECK(sessionImpl.getEventNotifier() == 0);
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
Modified: qpid/branches/QPID-2519/cpp/src/tests/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/QueueEvents.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/QueueEvents.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/QueueEvents.cpp Fri Oct 21 14:42:12 2011
@@ -147,7 +147,7 @@ struct EventRecorder
QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing)
{
- ProxySessionFixture fixture;
+ SessionFixture fixture;
//register dummy event listener to broker
EventRecorder listener;
fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));
@@ -194,7 +194,7 @@ QPID_AUTO_TEST_CASE(testSystemLevelEvent
QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing_enqueuesOnly)
{
- ProxySessionFixture fixture;
+ SessionFixture fixture;
//register dummy event listener to broker
EventRecorder listener;
fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));
Modified: qpid/branches/QPID-2519/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/QueuePolicyTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/QueuePolicyTest.cpp Fri Oct 21 14:42:12 2011
@@ -23,6 +23,7 @@
#include "test_tools.h"
#include "qpid/broker/QueuePolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/client/QueueOptions.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/reply_exceptions.h"
@@ -38,6 +39,7 @@ namespace tests {
QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite)
+namespace {
QueuedMessage createMessage(uint32_t size)
{
QueuedMessage msg;
@@ -45,7 +47,7 @@ QueuedMessage createMessage(uint32_t siz
MessageUtils::addContent(msg.payload, std::string (size, 'x'));
return msg;
}
-
+}
QPID_AUTO_TEST_CASE(testCount)
{
@@ -150,7 +152,7 @@ QPID_AUTO_TEST_CASE(testRingPolicyCount)
std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING);
policy->update(args);
- ProxySessionFixture f;
+ SessionFixture f;
std::string q("my-ring-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
for (int i = 0; i < 10; i++) {
@@ -185,7 +187,7 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING);
policy->update(args);
- ProxySessionFixture f;
+ SessionFixture f;
std::string q("my-ring-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
@@ -257,7 +259,7 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy
std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING_STRICT);
policy->update(args);
- ProxySessionFixture f;
+ SessionFixture f;
std::string q("my-ring-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
LocalQueue incoming;
@@ -283,7 +285,7 @@ QPID_AUTO_TEST_CASE(testPolicyWithDtx)
std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
policy->update(args);
- ProxySessionFixture f;
+ SessionFixture f;
std::string q("my-policy-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
LocalQueue incoming;
@@ -340,8 +342,10 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNo
//fallback to rejecting messages
QueueOptions args;
args.setSizePolicy(FLOW_TO_DISK, 0, 5);
+ // Disable flow control, or else we'll never hit the max limit
+ args.setInt(QueueFlowLimit::flowStopCountKey, 0);
- ProxySessionFixture f;
+ SessionFixture f;
std::string q("my-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
LocalQueue incoming;
@@ -367,7 +371,7 @@ QPID_AUTO_TEST_CASE(testPolicyFailureOnC
std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
policy->update(args);
- ProxySessionFixture f;
+ SessionFixture f;
std::string q("q");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
f.session.txSelect();
@@ -382,8 +386,9 @@ QPID_AUTO_TEST_CASE(testCapacityConversi
{
FieldTable args;
args.setString("qpid.max_count", "5");
+ args.setString("qpid.flow_stop_count", "0");
- ProxySessionFixture f;
+ SessionFixture f;
std::string q("q");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
for (int i = 0; i < 5; i++) {
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org