You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC

svn commit: r1187375 [15/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Thread.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Thread.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Thread.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Thread.cpp Fri Oct 21 14:42:12 2011
@@ -19,6 +19,11 @@
  *
  */
 
+// Ensure definition of OpenThread in mingw
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/windows/check.h"
@@ -26,50 +31,204 @@
 #include <process.h>
 #include <windows.h>
 
-namespace {
-unsigned __stdcall runRunnable(void* p)
-{
-    static_cast<qpid::sys::Runnable*>(p)->run();
-    _endthreadex(0);
-    return 0;
-}
-}
+/*
+ * This implementation distinguishes between two types of thread: Qpid
+ * threads (based on qpid::sys::Runnable) and the rest.  It provides a
+ * join() that will not deadlock against the Windows loader lock for
+ * Qpid threads.
+ *
+ * System thread identifiers are unique per Windows thread; thread
+ * handles are not.  Thread identifiers can be recycled, but keeping a
+ * handle open against the thread prevents recycling as long as
+ * shared_ptr references to a ThreadPrivate structure remain.
+ *
+ * There is a 1-1 relationship between Qpid threads and their
+ * ThreadPrivate structure.  Non-Qpid threads do not need to find the
+ * qpidThreadDone handle, so there may be a 1-many relationship for
+ * them.
+ *
+ * TLS storage is used for a lockless solution for static library
+ * builds.  The special case of LoadLibrary/FreeLibrary requires
+ * additional synchronization variables and resource cleanup in
+ * DllMain.  _DLL marks the dynamic case.
+ */
 
 namespace qpid {
 namespace sys {
 
 class ThreadPrivate {
+public:
     friend class Thread;
+    friend unsigned __stdcall runThreadPrivate(void*);
+    typedef boost::shared_ptr<ThreadPrivate> shared_ptr;
+    ~ThreadPrivate();
 
-    HANDLE threadHandle;
+private:
     unsigned threadId;
-    
-    ThreadPrivate(Runnable* runnable) {
-        uintptr_t h =  _beginthreadex(0,
-                                      0,
-                                      runRunnable,
-                                      runnable,
-                                      0,
-                                      &threadId);
-        QPID_WINDOWS_CHECK_CRT_NZ(h);
-        threadHandle = reinterpret_cast<HANDLE>(h);
+    HANDLE threadHandle;
+    HANDLE initCompleted;
+    HANDLE qpidThreadDone;
+    Runnable* runnable;
+    shared_ptr keepAlive;
+
+    ThreadPrivate() : threadId(GetCurrentThreadId()), initCompleted(NULL),
+                      qpidThreadDone(NULL), runnable(NULL) {
+        threadHandle =  OpenThread (SYNCHRONIZE, FALSE, threadId);
+        QPID_WINDOWS_CHECK_CRT_NZ(threadHandle);
     }
-    
-    ThreadPrivate()
-      : threadHandle(GetCurrentThread()), threadId(GetCurrentThreadId()) {}
+
+    ThreadPrivate(Runnable* r) : threadHandle(NULL), initCompleted(NULL),
+                                 qpidThreadDone(NULL), runnable(r) {}
+
+    void start(shared_ptr& p);
+    static shared_ptr createThread(Runnable* r);
 };
 
+}}  // namespace qpid::sys 
+
+
+namespace {
+using namespace qpid::sys;
+
+#ifdef _DLL
+class ScopedCriticalSection
+{
+  public:
+    ScopedCriticalSection(CRITICAL_SECTION& cs) : criticalSection(cs) { EnterCriticalSection(&criticalSection); }
+    ~ScopedCriticalSection() { LeaveCriticalSection(&criticalSection); }
+  private:
+    CRITICAL_SECTION& criticalSection;
+};
+
+CRITICAL_SECTION threadLock;
+long runningThreads = 0;
+HANDLE threadsDone;
+bool terminating = false;
+#endif
+
+
+DWORD volatile tlsIndex = TLS_OUT_OF_INDEXES;
+
+DWORD getTlsIndex() {
+    if (tlsIndex != TLS_OUT_OF_INDEXES)
+        return tlsIndex;        // already set
+
+    DWORD trialIndex = TlsAlloc();
+    QPID_WINDOWS_CHECK_NOT(trialIndex, TLS_OUT_OF_INDEXES); // No OS resource
+    
+    // only one thread gets to set the value
+    DWORD actualIndex = (DWORD) InterlockedCompareExchange((LONG volatile *) &tlsIndex, (LONG) trialIndex, (LONG) TLS_OUT_OF_INDEXES);
+    if (actualIndex == TLS_OUT_OF_INDEXES)
+        return trialIndex;      // we won the race
+    else {
+        TlsFree(trialIndex);
+        return actualIndex;
+    }
+}
+
+} // namespace
+
+namespace qpid {
+namespace sys {
+
+unsigned __stdcall runThreadPrivate(void* p)
+{
+    ThreadPrivate* threadPrivate = static_cast<ThreadPrivate*>(p);
+    TlsSetValue(getTlsIndex(), threadPrivate);
+
+    WaitForSingleObject (threadPrivate->initCompleted, INFINITE);
+    CloseHandle (threadPrivate->initCompleted);
+    threadPrivate->initCompleted = NULL;
+
+    try {
+        threadPrivate->runnable->run();
+    } catch (...) {
+        // not our concern
+    }
+
+    SetEvent (threadPrivate->qpidThreadDone); // allow join()
+    threadPrivate->keepAlive.reset();         // may run ThreadPrivate destructor
+
+#ifdef _DLL
+    {
+        ScopedCriticalSection l(threadLock);
+        if (--runningThreads == 0)
+            SetEvent(threadsDone);
+    }
+#endif
+    return 0;
+}
+
+
+ThreadPrivate::shared_ptr ThreadPrivate::createThread(Runnable* runnable) {
+    ThreadPrivate::shared_ptr tp(new ThreadPrivate(runnable));
+    tp->start(tp);
+    return tp;
+}
+
+void ThreadPrivate::start(ThreadPrivate::shared_ptr& tp) {
+    getTlsIndex();              // fail here if OS problem, not in new thread
+
+    initCompleted = CreateEvent (NULL, TRUE, FALSE, NULL);
+    QPID_WINDOWS_CHECK_CRT_NZ(initCompleted);
+    qpidThreadDone = CreateEvent (NULL, TRUE, FALSE, NULL);
+    QPID_WINDOWS_CHECK_CRT_NZ(qpidThreadDone);
+
+#ifdef _DLL
+    {
+        ScopedCriticalSection l(threadLock);
+        if (terminating)
+            throw qpid::Exception(QPID_MSG("creating thread after exit/FreeLibrary"));
+        runningThreads++;
+    }
+#endif
+
+    uintptr_t h =  _beginthreadex(0,
+                                  0,
+                                  runThreadPrivate,
+                                  (void *)this,
+                                  0,
+                                  &threadId);
+
+#ifdef _DLL
+    if (h == NULL) {
+        ScopedCriticalSection l(threadLock);
+        if (--runningThreads == 0)
+            SetEvent(threadsDone);
+    }
+#endif
+
+    QPID_WINDOWS_CHECK_CRT_NZ(h);
+
+    // Success
+    keepAlive = tp;
+    threadHandle = reinterpret_cast<HANDLE>(h);
+    SetEvent (initCompleted);
+}
+
+ThreadPrivate::~ThreadPrivate() {
+    if (threadHandle)
+        CloseHandle (threadHandle);
+    if (initCompleted)
+        CloseHandle (initCompleted);
+    if (qpidThreadDone)
+        CloseHandle (qpidThreadDone);
+}
+
+
 Thread::Thread() {}
 
-Thread::Thread(Runnable* runnable) : impl(new ThreadPrivate(runnable)) {}
+Thread::Thread(Runnable* runnable) : impl(ThreadPrivate::createThread(runnable)) {}
 
-Thread::Thread(Runnable& runnable) : impl(new ThreadPrivate(&runnable)) {}
+Thread::Thread(Runnable& runnable) : impl(ThreadPrivate::createThread(&runnable)) {}
 
 Thread::operator bool() {
     return impl;
 }
 
 bool Thread::operator==(const Thread& t) const {
+    if (!impl || !t.impl)
+        return false;
     return impl->threadId == t.impl->threadId;
 }
 
@@ -79,10 +238,17 @@ bool Thread::operator!=(const Thread& t)
 
 void Thread::join() {
     if (impl) {
-        DWORD status = WaitForSingleObject (impl->threadHandle, INFINITE);
+        DWORD status;
+        if (impl->runnable) {
+            HANDLE handles[2] = {impl->qpidThreadDone, impl->threadHandle};
+            // wait for either.  threadHandle not signalled if loader
+            // lock held (FreeLibrary).  qpidThreadDone not signalled
+            // if thread terminated by exit().
+            status = WaitForMultipleObjects (2, handles, false, INFINITE);
+        }
+        else
+            status = WaitForSingleObject (impl->threadHandle, INFINITE);
         QPID_WINDOWS_CHECK_NOT(status, WAIT_FAILED);
-        CloseHandle (impl->threadHandle);
-        impl->threadHandle = 0;
     }
 }
 
@@ -92,9 +258,70 @@ unsigned long Thread::logId() {
 
 /* static */
 Thread Thread::current() {
+    ThreadPrivate* tlsValue = (ThreadPrivate *) TlsGetValue(getTlsIndex());
     Thread t;
-    t.impl.reset(new ThreadPrivate());
+    if (tlsValue != NULL) {
+        // called from within Runnable->run(), so keepAlive has positive use count
+        t.impl = tlsValue->keepAlive;
+    }
+    else
+        t.impl.reset(new ThreadPrivate());
     return t;
 }
 
-}}  /* qpid::sys */
+}}  // namespace qpid::sys
+
+
+#ifdef _DLL
+
+// DllMain: called possibly many times in a process lifetime if dll
+// loaded and freed repeatedly .  Be mindful of Windows loader lock
+// and other DllMain restrictions.
+
+BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) {
+    switch (reason) {
+    case DLL_PROCESS_ATTACH:
+        InitializeCriticalSection(&threadLock);
+        threadsDone = CreateEvent(NULL, TRUE, FALSE, NULL);
+        break;
+
+    case DLL_PROCESS_DETACH:
+        terminating = true;
+        if (reserved != NULL) {
+            // process exit(): threads are stopped arbitrarily and
+            // possibly in an inconsistent state.  Not even threadLock
+            // can be trusted.  All static destructors have been
+            // called at this point and any resources this unit knows
+            // about will be released as part of process tear down by
+            // the OS.  Accordingly, do nothing.
+            return TRUE;
+        }
+        else {
+            // FreeLibrary(): threads are still running and we are
+            // encouraged to clean up to avoid leaks.  Mostly we just
+            // want any straggler threads to finish and notify
+            // threadsDone as the last thing they do.
+            while (1) {
+                {
+                    ScopedCriticalSection l(threadLock);
+                    if (runningThreads == 0)
+                        break;
+                    ResetEvent(threadsDone);
+                }
+                WaitForSingleObject(threadsDone, INFINITE);
+            }
+            if (tlsIndex != TLS_OUT_OF_INDEXES)
+                TlsFree(getTlsIndex());
+            CloseHandle(threadsDone);
+            DeleteCriticalSection(&threadLock);
+        }
+        break;
+
+    case DLL_THREAD_ATTACH:
+    case DLL_THREAD_DETACH:
+        break;
+    }
+    return TRUE;
+}
+
+#endif

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Time.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Time.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Time.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Time.cpp Fri Oct 21 14:42:12 2011
@@ -27,6 +27,17 @@
 
 using namespace boost::posix_time;
 
+namespace {
+
+// High-res timing support. This will display times since program start,
+// more or less. Keep track of the start value and the conversion factor to
+// seconds.
+bool timeInitialized = false;
+LARGE_INTEGER start;
+double freq = 1.0;
+
+}
+
 namespace qpid {
 namespace sys {
 
@@ -91,10 +102,35 @@ void outputFormattedNow(std::ostream& o)
     char time_string[100];
 
     ::time( &rawtime );
+#ifdef _MSC_VER
     ::localtime_s(&timeinfo, &rawtime);
+#else
+    timeinfo = *(::localtime(&rawtime));
+#endif
     ::strftime(time_string, 100,
                "%Y-%m-%d %H:%M:%S",
                &timeinfo);
     o << time_string << " ";
 }
+
+void outputHiresNow(std::ostream& o) {
+    if (!timeInitialized) {
+        start.QuadPart = 0;
+        LARGE_INTEGER iFreq;
+        iFreq.QuadPart = 1;
+        QueryPerformanceCounter(&start);
+        QueryPerformanceFrequency(&iFreq);
+        freq = static_cast<double>(iFreq.QuadPart);
+        timeInitialized = true;
+    }
+    LARGE_INTEGER iNow;
+    iNow.QuadPart = 0;
+    QueryPerformanceCounter(&iNow);
+    iNow.QuadPart -= start.QuadPart;
+    if (iNow.QuadPart < 0)
+        iNow.QuadPart = 0;
+    double now = static_cast<double>(iNow.QuadPart);
+    now /= freq;                 // now is seconds after this
+    o << std::fixed << std::setprecision(8) << std::setw(16) << std::setfill('0') << now << "s ";
+}
 }}

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/uuid.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/uuid.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/uuid.cpp Fri Oct 21 14:42:12 2011
@@ -19,7 +19,7 @@
  *
  */
 
-#include <Rpc.h>
+#include <rpc.h>
 #ifdef uuid_t   /*  Done in rpcdce.h */
 #  undef uuid_t
 #endif
@@ -52,7 +52,11 @@ int uuid_parse (const char *in, uuid_t u
 void uuid_unparse (const uuid_t uu, char *out) {
     unsigned char *formatted;
     if (UuidToString((UUID*)uu, &formatted) == RPC_S_OK) {
+#ifdef _MSC_VER
         strncpy_s (out, 36+1, (char*)formatted, _TRUNCATE);
+#else
+        strncpy (out, (char*)formatted, 36+1);
+#endif
         RpcStringFree(&formatted);
     }
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/types/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/types/Uuid.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/types/Uuid.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/types/Uuid.cpp Fri Oct 21 14:42:12 2011
@@ -20,6 +20,7 @@
  */
 #include "qpid/types/Uuid.h"
 #include "qpid/sys/uuid.h"
+#include "qpid/sys/IntegerTypes.h"
 #include <sstream>
 #include <iostream>
 #include <string.h>
@@ -71,7 +72,8 @@ void Uuid::clear()
 // Force int 0/!0 to false/true; avoids compile warnings.
 bool Uuid::isNull() const
 {
-    return !!uuid_is_null(bytes);
+    // This const cast is for Solaris which has non const arguments
+    return !!uuid_is_null(const_cast<uint8_t*>(bytes));
 }
 
 Uuid::operator bool() const { return !isNull(); }
@@ -86,7 +88,8 @@ const unsigned char* Uuid::data() const
 
 bool operator==(const Uuid& a, const Uuid& b)
 {
-    return uuid_compare(a.bytes, b.bytes) == 0;
+    // This const cast is for Solaris which has non const arguments
+    return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) == 0;
 }
 
 bool operator!=(const Uuid& a, const Uuid& b)
@@ -96,22 +99,26 @@ bool operator!=(const Uuid& a, const Uui
 
 bool operator<(const Uuid& a, const Uuid& b)
 {
-    return uuid_compare(a.bytes, b.bytes) < 0;
+    // This const cast is for Solaris which has non const arguments
+    return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) < 0;
 }
 
 bool operator>(const Uuid& a, const Uuid& b)
 {
-    return uuid_compare(a.bytes, b.bytes) > 0;
+    // This const cast is for Solaris which has non const arguments
+    return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) > 0;
 }
 
 bool operator<=(const Uuid& a, const Uuid& b)
 {
-    return uuid_compare(a.bytes, b.bytes) <= 0;
+    // This const cast is for Solaris which has non const arguments
+    return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) <= 0;
 }
 
 bool operator>=(const Uuid& a, const Uuid& b)
 {
-    return uuid_compare(a.bytes, b.bytes) >= 0;
+    // This const cast is for Solaris which has non const arguments
+    return uuid_compare(const_cast<uint8_t*>(a.bytes), const_cast<uint8_t*>(b.bytes)) >= 0;
 }
 
 ostream& operator<<(ostream& out, Uuid uuid)

Modified: qpid/branches/QPID-2519/cpp/src/qpid/types/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/types/Variant.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/types/Variant.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/types/Variant.cpp Fri Oct 21 14:42:12 2011
@@ -19,7 +19,6 @@
  *
  */
 #include "qpid/types/Variant.h"
-#include "qpid/Msg.h"
 #include "qpid/log/Statement.h"
 #include <boost/format.hpp>
 #include <boost/lexical_cast.hpp>
@@ -108,15 +107,27 @@ class VariantImpl
     } value;
     std::string encoding;//optional encoding for variable length data
 
-    std::string getTypeName(VariantType type) const;
     template<class T> T convertFromString() const
     {
         std::string* s = reinterpret_cast<std::string*>(value.v);
-        try {
-            return boost::lexical_cast<T>(*s);
-        } catch(const boost::bad_lexical_cast&) {
-            throw InvalidConversion(QPID_MSG("Cannot convert " << *s));
+        if (std::numeric_limits<T>::is_signed || s->find('-') != 0) {
+            //lexical_cast won't fail if string is a negative number and T is unsigned
+            try {
+                return boost::lexical_cast<T>(*s);
+            } catch(const boost::bad_lexical_cast&) {
+                //don't return, throw exception below
+            }
+        } else {
+            //T is unsigned and number starts with '-'
+            try {
+                //handle special case of negative zero
+                if (boost::lexical_cast<int>(*s) == 0) return 0;
+                //else its a non-zero negative number so throw exception at end of function
+            } catch(const boost::bad_lexical_cast&) {
+                //wasn't a valid int, therefore not a valid uint
+            }
         }
+        throw InvalidConversion(QPID_MSG("Cannot convert " << *s));
     }
 };
 
@@ -370,11 +381,11 @@ int8_t VariantImpl::asInt8() const
               return int8_t(value.ui16);
           break;
       case VAR_UINT32:
-          if (value.ui32 <= (uint) std::numeric_limits<int8_t>::max())
+          if (value.ui32 <= (uint32_t) std::numeric_limits<int8_t>::max())
               return int8_t(value.ui32);
           break;
       case VAR_UINT64:
-          if (value.ui64 <= (uint) std::numeric_limits<int8_t>::max())
+          if (value.ui64 <= (uint64_t) std::numeric_limits<int8_t>::max())
               return int8_t(value.ui64);
           break;
       case VAR_STRING: return convertFromString<int8_t>();
@@ -401,11 +412,11 @@ int16_t VariantImpl::asInt16() const
               return int16_t(value.ui16);
           break;
       case VAR_UINT32:
-          if (value.ui32 <= (uint) std::numeric_limits<int16_t>::max())
+          if (value.ui32 <= (uint32_t) std::numeric_limits<int16_t>::max())
               return int16_t(value.ui32);
           break;
       case VAR_UINT64:
-          if (value.ui64 <= (uint) std::numeric_limits<int16_t>::max())
+          if (value.ui64 <= (uint64_t) std::numeric_limits<int16_t>::max())
               return int16_t(value.ui64);
           break;
       case VAR_STRING: return convertFromString<int16_t>();
@@ -430,7 +441,7 @@ int32_t VariantImpl::asInt32() const
               return int32_t(value.ui32);
           break;
       case VAR_UINT64:
-        if (value.ui64 <= (uint32_t) std::numeric_limits<int32_t>::max())
+        if (value.ui64 <= (uint64_t) std::numeric_limits<int32_t>::max())
               return int32_t(value.ui64);
           break;
       case VAR_STRING: return convertFromString<int32_t>();
@@ -582,7 +593,7 @@ const std::string& VariantImpl::getStrin
 void VariantImpl::setEncoding(const std::string& s) { encoding = s; }
 const std::string& VariantImpl::getEncoding() const { return encoding; }
 
-std::string VariantImpl::getTypeName(VariantType type) const
+std::string getTypeName(VariantType type)
 {
     switch (type) {
       case VAR_VOID: return "void";

Modified: qpid/branches/QPID-2519/cpp/src/replication.mk
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/replication.mk?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/replication.mk (original)
+++ qpid/branches/QPID-2519/cpp/src/replication.mk Fri Oct 21 14:42:12 2011
@@ -19,14 +19,14 @@
 # Make file for building two plugins for asynchronously replicating
 # queues.
 
-dmodule_LTLIBRARIES += replicating_listener.la replication_exchange.la
+dmoduleexec_LTLIBRARIES += replicating_listener.la replication_exchange.la
 
 # a queue event listener plugin that creates messages on a replication
 # queue corresponding to enqueue and dequeue events:
 replicating_listener_la_SOURCES =  \
 	qpid/replication/constants.h \
 	qpid/replication/ReplicatingEventListener.cpp \
-	qpid/replication/ReplicatingEventListener.h 
+	qpid/replication/ReplicatingEventListener.h
 
 replicating_listener_la_LIBADD = libqpidbroker.la
 if SUNOS
@@ -41,7 +41,7 @@ replicating_listener_la_LDFLAGS = $(PLUG
 replication_exchange_la_SOURCES =  \
 	qpid/replication/constants.h \
 	qpid/replication/ReplicationExchange.cpp \
-	qpid/replication/ReplicationExchange.h 
+	qpid/replication/ReplicationExchange.h
 
 replication_exchange_la_LIBADD = libqpidbroker.la
 

Modified: qpid/branches/QPID-2519/cpp/src/ssl.mk
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/ssl.mk?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/ssl.mk (original)
+++ qpid/branches/QPID-2519/cpp/src/ssl.mk Fri Oct 21 14:42:12 2011
@@ -18,7 +18,7 @@
 #
 #
 # Makefile fragment, conditionally included in Makefile.am
-# 
+#
 libsslcommon_la_SOURCES = \
   qpid/sys/ssl/check.h \
   qpid/sys/ssl/check.cpp \
@@ -47,7 +47,7 @@ ssl_la_CXXFLAGS=$(AM_CXXFLAGS) $(SSL_CFL
 
 ssl_la_LDFLAGS = $(PLUGINLDFLAGS)
 
-dmodule_LTLIBRARIES += ssl.la
+dmoduleexec_LTLIBRARIES += ssl.la
 
 sslconnector_la_SOURCES = \
   qpid/client/SslConnector.cpp
@@ -60,5 +60,5 @@ sslconnector_la_CXXFLAGS = $(AM_CXXFLAGS
 
 sslconnector_la_LDFLAGS = $(PLUGINLDFLAGS)
 
-cmodule_LTLIBRARIES += \
+cmoduleexec_LTLIBRARIES += \
   sslconnector.la

Modified: qpid/branches/QPID-2519/cpp/src/tests/.valgrind.supp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/.valgrind.supp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/.valgrind.supp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/.valgrind.supp Fri Oct 21 14:42:12 2011
@@ -73,61 +73,6 @@
 }
 
 {
-   boost 103200 -- we think Boost is responsible for these leaks.
-   Memcheck:Leak
-   fun:_Znwm
-   fun:_ZN5boost15program_options??options_description*
-}
-
-{
-   boost 103200 -- we think Boost is responsible for these leaks.
-   Memcheck:Leak
-   fun:_Znwm
-   fun:_ZN5boost9unit_test9test_case*
-}
-
-{
-   boost 103200 -- we think Boost is responsible for these leaks.
-   Memcheck:Leak
-   fun:calloc
-   fun:_dlerror_run
-   fun:dlopen@@GLIBC_2.2.5
-   fun:_ZN4qpid3sys5Shlib4loadEPKc
-   fun:_Z9testShlibv
-   fun:_ZN5boost9unit_test9ut_detail17unit_test_monitor8functionEv
-   obj:/usr/lib64/libboost_unit_test_framework.so.1.32.0
-   fun:_ZN5boost17execution_monitor7executeEbi
-   fun:_ZN5boost9unit_test9ut_detail17unit_test_monitor21execute_and_translateEPNS0_9test_caseEMS3_FvvEi
-   fun:_ZN5boost9unit_test9test_case3runEv
-   fun:_ZN5boost9unit_test10test_suite6do_runEv
-   fun:_ZN5boost9unit_test9test_case3runEv
-   fun:main
-}
-
-{
-   boost 103200 -- we think Boost is responsible for these leaks.
-   Memcheck:Leak
-   fun:calloc
-   fun:_dl_allocate_tls
-   fun:pthread_create@@GLIBC_2.2.5
-   fun:_ZN4qpid6broker5Timer5startEv
-   fun:_ZN4qpid6broker5TimerC1Ev
-   fun:_ZN4qpid6broker10DtxManagerC1Ev
-   fun:_ZN4qpid6broker6BrokerC1ERKNS1_7OptionsE
-   fun:_ZN4qpid6broker6Broker6createERKNS1_7OptionsE
-   fun:_ZN15SessionFixtureTI15ProxyConnectionEC2Ev
-   fun:_Z14testQueueQueryv
-   fun:_ZN5boost9unit_test9ut_detail17unit_test_monitor8functionEv
-   obj:/usr/lib64/libboost_unit_test_framework.so.1.32.0
-   fun:_ZN5boost17execution_monitor7executeEbi
-   fun:_ZN5boost9unit_test9ut_detail17unit_test_monitor21execute_and_translateEPNS0_9test_caseEMS3_FvvEi
-   fun:_ZN5boost9unit_test9test_case3runEv
-   fun:_ZN5boost9unit_test10test_suite6do_runEv
-   fun:_ZN5boost9unit_test9test_case3runEv
-   fun:main
-}
-
-{
    INVESTIGATE
    Memcheck:Leak
    fun:calloc
@@ -155,25 +100,6 @@
 }
 
 {
-   boost 103200 -- mgoulish -- fix this, sometime
-   Memcheck:Leak
-   fun:*
-   fun:*
-   obj:*
-   fun:*
-   fun:_ZN4qpid34options_description_less_easy_initclEPKcPKN5boost15program_options14value_semanticES2_
-}  
-
-{
-   boost 103200 -- mgoulish -- fix this, sometime
-   Memcheck:Leak
-   fun:*
-   fun:*
-   fun:*
-   fun:_ZN4qpid34options_description_less_easy_initclEPKcPKN5boost15program_options14value_semanticES2_
-}
-
-{
    INVESTIGATE
    Memcheck:Param
    socketcall.sendto(msg)

Modified: qpid/branches/QPID-2519/cpp/src/tests/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/Address.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/Address.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/Address.cpp Fri Oct 21 14:42:12 2011
@@ -119,6 +119,17 @@ QPID_AUTO_TEST_CASE(testParseQuotedNameA
     BOOST_CHECK_EQUAL(std::string("my subject with ; in it"), address.getSubject());
 }
 
+QPID_AUTO_TEST_CASE(testParseOptionsWithEmptyStringAsValue)
+{
+    Address address("my-topic; {a:'', x:101}");
+    BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+    Variant a = address.getOptions()["a"];
+    BOOST_CHECK_EQUAL(VAR_STRING, a.getType());
+    std::string aVal = a;
+    BOOST_CHECK(aVal.size() == 0);
+    BOOST_CHECK_EQUAL((uint16_t) 101, address.getOptions()["x"].asInt64());
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }}

Modified: qpid/branches/QPID-2519/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/BrokerFixture.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/BrokerFixture.h (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/BrokerFixture.h Fri Oct 21 14:42:12 2011
@@ -22,8 +22,6 @@
  *
  */
 
-#include "SocketProxy.h"
-
 #include "qpid/broker/Broker.h"
 #include "qpid/client/Connection.h"
 #include "qpid/client/ConnectionImpl.h"
@@ -71,16 +69,15 @@ struct  BrokerFixture : private boost::n
         brokerThread = qpid::sys::Thread(*broker);
     };
 
-    void shutdownBroker()
-    {
-        broker->shutdown();
-        broker = BrokerPtr();
+    void shutdownBroker() {
+        if (broker) {
+            broker->shutdown();
+            brokerThread.join();
+            broker = BrokerPtr();
+        }
     }
 
-    ~BrokerFixture() {
-        if (broker) broker->shutdown();
-        brokerThread.join();
-    }
+    ~BrokerFixture() {  shutdownBroker(); }
 
     /** Open a connection to the broker. */
     void open(qpid::client::Connection& c) {
@@ -97,20 +94,6 @@ struct LocalConnection : public qpid::cl
     ~LocalConnection() { close(); }
 };
 
-/** A local client connection via a socket proxy. */
-struct ProxyConnection : public qpid::client::Connection {
-    SocketProxy proxy;
-    ProxyConnection(int brokerPort) : proxy(brokerPort) {
-        open("localhost", proxy.getPort());
-    }
-    ProxyConnection(const qpid::client::ConnectionSettings& s) : proxy(s.port) {
-        qpid::client::ConnectionSettings proxySettings(s);
-        proxySettings.port = proxy.getPort();
-        open(proxySettings);
-    }
-    ~ProxyConnection() { close(); }
-};
-
 /** Convenience class to create and open a connection and session
  * and some related useful objects.
  */
@@ -147,7 +130,6 @@ struct  SessionFixtureT : BrokerFixture,
 };
 
 typedef SessionFixtureT<LocalConnection> SessionFixture;
-typedef SessionFixtureT<ProxyConnection> ProxySessionFixture;
 
 }} // namespace qpid::tests
 

Modified: qpid/branches/QPID-2519/cpp/src/tests/BrokerMgmtAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/BrokerMgmtAgent.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/BrokerMgmtAgent.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/BrokerMgmtAgent.cpp Fri Oct 21 14:42:12 2011
@@ -599,13 +599,12 @@ namespace qpid {
             // populate the agent with multiple test objects
             const size_t objCount = 50;
             std::vector<TestManageable *> tmv;
-            uint32_t objLen;
 
             for (size_t i = 0; i < objCount; i++) {
                 std::stringstream key;
                 key << "testobj-" << i;
                 TestManageable *tm = new TestManageable(agent, key.str());
-                objLen = tm->GetManagementObject()->writePropertiesSize();
+                (void) tm->GetManagementObject()->writePropertiesSize();
                 agent->addObject(tm->GetManagementObject(), key.str());
                 tmv.push_back(tm);
             }

Modified: qpid/branches/QPID-2519/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/CMakeLists.txt?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/CMakeLists.txt Fri Oct 21 14:42:12 2011
@@ -107,7 +107,6 @@ set(unit_tests_to_build
     MessagingSessionTests
     SequenceSet
     StringUtils
-    IncompleteMessageList
     RangeSet
     AtomicValue
     QueueTest
@@ -119,6 +118,7 @@ set(unit_tests_to_build
     MessageTest
     QueueRegistryTest
     QueuePolicyTest
+    QueueFlowLimitTest
     FramingTest
     HeaderTest
     SequenceNumberTest
@@ -264,6 +264,19 @@ add_executable (qpid-send qpid-send.cpp 
 target_link_libraries (qpid-send qpidmessaging)
 remember_location(qpid-send)
 
+add_executable (qpid-ping qpid-ping.cpp ${platform_test_additions})
+target_link_libraries (qpid-ping qpidclient)
+remember_location(qpid-ping)
+
+add_executable (datagen datagen.cpp ${platform_test_additions})
+target_link_libraries (datagen qpidclient)
+remember_location(datagen)
+
+add_executable (msg_group_test msg_group_test.cpp ${platform_test_additions})
+target_link_libraries (msg_group_test qpidmessaging)
+remember_location(msg_group_test)
+
+
 # qpid-perftest and qpid-latency-test are generally useful so install them
 install (TARGETS qpid-perftest qpid-latency-test RUNTIME
          DESTINATION ${QPID_INSTALL_BINDIR})
@@ -278,7 +291,7 @@ set(test_wrap ${shell} ${CMAKE_CURRENT_S
 
 add_test (unit_test ${test_wrap} ${unit_test_LOCATION})
 add_test (start_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/start_broker${test_script_suffix})
-add_test (qpid-client-test ${test_wrap} ${qpid-client_test_LOCATION})
+add_test (qpid-client-test ${test_wrap} ${qpid-client-test_LOCATION})
 add_test (quick_perftest ${test_wrap} ${qpid-perftest_LOCATION} --summary --count 100)
 add_test (quick_topictest ${test_wrap} ${CMAKE_CURRENT_SOURCE_DIR}/quick_topictest${test_script_suffix})
 add_test (quick_txtest ${test_wrap} ${qpid-txtest_LOCATION} --queues 4 --tx-count 10 --quiet)
@@ -288,6 +301,7 @@ if (PYTHON_EXECUTABLE)
 endif (PYTHON_EXECUTABLE)
 add_test (stop_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/stop_broker${test_script_suffix})
 if (PYTHON_EXECUTABLE)
+  add_test (ipv6_test ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix})
   add_test (federation_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_federation_tests${test_script_suffix})
 if (BUILD_ACL)
   add_test (acl_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_acl_tests${test_script_suffix})

Modified: qpid/branches/QPID-2519/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/ClientSessionTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/ClientSessionTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/ClientSessionTest.cpp Fri Oct 21 14:42:12 2011
@@ -102,9 +102,9 @@ struct SimpleListener : public MessageLi
     }
 };
 
-struct ClientSessionFixture : public ProxySessionFixture
+struct ClientSessionFixture : public SessionFixture
 {
-    ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {
+    ClientSessionFixture(Broker::Options opts = Broker::Options()) : SessionFixture(opts) {
         session.queueDeclare(arg::queue="my-queue");
     }
 };
@@ -150,16 +150,6 @@ QPID_AUTO_TEST_CASE(testDispatcherThread
         BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData());
 }
 
-// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented.
-void testSuspend0Timeout() {
-    ClientSessionFixture fix;
-    fix.session.suspend();  // session has 0 timeout.
-    try {
-        fix.connection.resume(fix.session);
-        BOOST_FAIL("Expected InvalidArgumentException.");
-    } catch(const InternalErrorException&) {}
-}
-
 QPID_AUTO_TEST_CASE(testUseSuspendedError)
 {
     ClientSessionFixture fix;
@@ -171,18 +161,6 @@ QPID_AUTO_TEST_CASE(testUseSuspendedErro
     } catch(const NotAttachedException&) {}
 }
 
-// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented.
-void testSuspendResume() {
-    ClientSessionFixture fix;
-    fix.session.timeout(60);
-    fix.session.suspend();
-    // Make sure we are still subscribed after resume.
-    fix.connection.resume(fix.session);
-    fix.session.messageTransfer(arg::content=Message("my-message", "my-queue"));
-    BOOST_CHECK_EQUAL("my-message", fix.subs.get("my-queue", TIME_SEC).getData());
-}
-
-
 QPID_AUTO_TEST_CASE(testSendToSelf) {
     ClientSessionFixture fix;
     SimpleListener mylistener;
@@ -271,8 +249,12 @@ QPID_AUTO_TEST_CASE(testOpenFailure) {
 QPID_AUTO_TEST_CASE(testPeriodicExpiration) {
     Broker::Options opts;
     opts.queueCleanInterval = 1;
+    opts.queueFlowStopRatio = 0;
+    opts.queueFlowResumeRatio = 0;
     ClientSessionFixture fix(opts);
-    fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
+    FieldTable args;
+    args.setInt("qpid.max_count",10);
+    fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
 
     for (uint i = 0; i < 10; i++) {
         Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
@@ -283,6 +265,7 @@ QPID_AUTO_TEST_CASE(testPeriodicExpirati
     BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u);
     qpid::sys::sleep(2);
     BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u);
+    fix.session.messageTransfer(arg::content=Message("Message_11", "my-queue"));//ensure policy is also updated
 }
 
 QPID_AUTO_TEST_CASE(testExpirationOnPop) {

Modified: qpid/branches/QPID-2519/cpp/src/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/ExchangeTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/ExchangeTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/ExchangeTest.cpp Fri Oct 21 14:42:12 2011
@@ -253,7 +253,7 @@ QPID_AUTO_TEST_CASE(testIVEOption)
     TopicExchange topic ("topic1", false, args);
 
     intrusive_ptr<Message> msg1 = cmessage("direct1", "abc");
-    msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("a", "abc");
+    msg1->insertCustomProperty("a", "abc");
     DeliverableMessage dmsg1(msg1);
 
     FieldTable args2;

Modified: qpid/branches/QPID-2519/cpp/src/tests/ForkedBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/ForkedBroker.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/ForkedBroker.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/ForkedBroker.cpp Fri Oct 21 14:42:12 2011
@@ -68,8 +68,7 @@ ForkedBroker::~ForkedBroker() {
     }
     if (!dataDir.empty())
     {
-        int unused_ret; // Suppress warnings about ignoring return value.
-        unused_ret = ::system(("rm -rf "+dataDir).c_str());
+        (void) ::system(("rm -rf "+dataDir).c_str());
     }
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/Makefile.am?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/Makefile.am Fri Oct 21 14:42:12 2011
@@ -75,7 +75,7 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	MessagingThreadTests.cpp \
 	MessagingFixture.h \
 	ClientSessionTest.cpp \
-	BrokerFixture.h SocketProxy.h \
+	BrokerFixture.h \
 	exception_test.cpp \
 	RefCounted.cpp \
 	SessionState.cpp logging.cpp \
@@ -87,7 +87,6 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	InlineVector.cpp \
 	SequenceSet.cpp \
 	StringUtils.cpp \
-	IncompleteMessageList.cpp \
 	RangeSet.cpp \
 	AtomicValue.cpp \
 	QueueTest.cpp \
@@ -99,6 +98,7 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	MessageTest.cpp \
 	QueueRegistryTest.cpp \
 	QueuePolicyTest.cpp \
+	QueueFlowLimitTest.cpp \
 	FramingTest.cpp \
 	HeaderTest.cpp \
 	SequenceNumberTest.cpp \
@@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	Variant.cpp \
 	Address.cpp \
 	ClientMessage.cpp \
-	Qmf2.cpp
+	Qmf2.cpp \
+	BrokerOptions.cpp
 
 if HAVE_XML
 unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -286,31 +287,27 @@ check_PROGRAMS+=datagen
 datagen_SOURCES=datagen.cpp
 datagen_LDADD=$(lib_common) $(lib_client)
 
-check_PROGRAMS+=qrsh_server
-qrsh_server_SOURCES=qrsh_server.cpp
-qrsh_server_LDADD=$(lib_client)
-
-check_PROGRAMS+=qrsh_run
-qrsh_run_SOURCES=qrsh_run.cpp
-qrsh_run_LDADD=$(lib_client)
-
-check_PROGRAMS+=qrsh
-qrsh_SOURCES=qrsh.cpp
-qrsh_LDADD=$(lib_client)
-
 check_PROGRAMS+=qpid-stream
 qpid_stream_INCLUDES=$(PUBLIC_INCLUDES)
 qpid_stream_SOURCES=qpid-stream.cpp
 qpid_stream_LDADD=$(lib_messaging)
 
+check_PROGRAMS+=msg_group_test
+msg_group_test_INCLUDES=$(PUBLIC_INCLUDES)
+msg_group_test_SOURCES=msg_group_test.cpp
+msg_group_test_LDADD=$(lib_messaging)
+
 TESTS_ENVIRONMENT = \
     VALGRIND=$(VALGRIND) \
     LIBTOOL="$(LIBTOOL)" \
     QPID_DATA_DIR= \
     $(srcdir)/run_test 
 
-system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest
-TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test dynamic_log_level_test
+system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \
+  run_msg_group_tests
+TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_federation_sys_tests \
+  run_acl_tests run_cli_tests replication_test dynamic_log_level_test \
+  run_queue_flow_limit_tests ipv6_test
 
 EXTRA_DIST +=								\
   run_test vg_check							\
@@ -325,6 +322,8 @@ EXTRA_DIST +=								\
   config.null								\
   ais_check								\
   run_federation_tests							\
+  run_federation_sys_tests                  \
+  run_long_federation_sys_tests             \
   run_cli_tests								\
   run_acl_tests								\
   .valgrind.supp							\
@@ -349,7 +348,10 @@ EXTRA_DIST +=								\
   run_test.ps1								\
   start_broker.ps1							\
   stop_broker.ps1							\
-  topictest.ps1
+  topictest.ps1                                                         \
+  run_queue_flow_limit_tests						\
+  run_msg_group_tests							\
+  ipv6_test
 
 check_LTLIBRARIES += libdlclose_noop.la
 libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
@@ -360,7 +362,11 @@ CLEANFILES+=valgrind.out *.log *.vglog* 
 # Longer running stability tests, not run by default check: target.
 # Not run under valgrind, too slow
 
-LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \
+LONG_TESTS+=start_broker \
+ fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \
+ run_msg_group_tests_soak \
+ stop_broker \
+ run_long_federation_sys_tests \
  run_failover_soak reliable_replication_test \
  federated_cluster_test_with_node_failure
 
@@ -372,7 +378,8 @@ EXTRA_DIST+=						\
 	run_failover_soak				\
 	reliable_replication_test			\
 	federated_cluster_test_with_node_failure        \
-	sasl_test_setup.sh
+	sasl_test_setup.sh                              \
+	run_msg_group_tests_soak
 
 check-long:
 	$(MAKE) check TESTS="$(LONG_TESTS)" VALGRIND=

Modified: qpid/branches/QPID-2519/cpp/src/tests/MessageReplayTracker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/MessageReplayTracker.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/MessageReplayTracker.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/MessageReplayTracker.cpp Fri Oct 21 14:42:12 2011
@@ -51,7 +51,7 @@ class ReplayBufferChecker
 
 QPID_AUTO_TEST_CASE(testReplay)
 {
-    ProxySessionFixture fix;
+    SessionFixture fix;
     fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
 
     MessageReplayTracker tracker(10);
@@ -77,7 +77,7 @@ QPID_AUTO_TEST_CASE(testReplay)
 
 QPID_AUTO_TEST_CASE(testCheckCompletion)
 {
-    ProxySessionFixture fix;
+    SessionFixture fix;
     fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
 
     MessageReplayTracker tracker(10);

Modified: qpid/branches/QPID-2519/cpp/src/tests/MessagingFixture.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/MessagingFixture.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/MessagingFixture.h (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/MessagingFixture.h Fri Oct 21 14:42:12 2011
@@ -27,15 +27,19 @@
 #include "qpid/client/Connection.h"
 #include "qpid/client/Session.h"
 #include "qpid/framing/Uuid.h"
+#include "qpid/messaging/Address.h"
 #include "qpid/messaging/Connection.h"
 #include "qpid/messaging/Session.h"
 #include "qpid/messaging/Sender.h"
 #include "qpid/messaging/Receiver.h"
 #include "qpid/messaging/Message.h"
+#include "qpid/types/Variant.h"
 
 namespace qpid {
 namespace tests {
 
+using qpid::types::Variant;
+
 struct BrokerAdmin
 {
     qpid::client::Connection connection;
@@ -223,6 +227,119 @@ inline void receive(messaging::Receiver&
     }
 }
 
+
+class MethodInvoker
+{
+  public:
+    MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
+                                      sender(session.createSender("qmf.default.direct/broker")),
+                                      receiver(session.createReceiver(replyTo)) {}
+
+    void createExchange(const std::string& name, const std::string& type, bool durable=false)
+    {
+        Variant::Map params;
+        params["name"]=name;
+        params["type"]="exchange";
+        params["properties"] = Variant::Map();
+        params["properties"].asMap()["exchange-type"] = type;
+        params["properties"].asMap()["durable"] = durable;
+        methodRequest("create", params);
+    }
+
+    void deleteExchange(const std::string& name)
+    {
+        Variant::Map params;
+        params["name"]=name;
+        params["type"]="exchange";
+        methodRequest("delete", params);
+    }
+
+    void createQueue(const std::string& name, bool durable=false, bool autodelete=false,
+                     const Variant::Map& options=Variant::Map())
+    {
+        Variant::Map params;
+        params["name"]=name;
+        params["type"]="queue";
+        params["properties"] = options;
+        params["properties"].asMap()["durable"] = durable;
+        params["properties"].asMap()["auto-delete"] = autodelete;
+        methodRequest("create", params);
+    }
+
+    void deleteQueue(const std::string& name)
+    {
+        Variant::Map params;
+        params["name"]=name;
+        params["type"]="queue";
+        methodRequest("delete", params);
+    }
+
+    void bind(const std::string& exchange, const std::string& queue, const std::string& key,
+                       const Variant::Map& options=Variant::Map())
+    {
+        Variant::Map params;
+        params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str();
+        params["type"]="binding";
+        params["properties"] = options;
+        methodRequest("create", params);
+    }
+
+    void unbind(const std::string& exchange, const std::string& queue, const std::string& key)
+    {
+        Variant::Map params;
+        params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str();
+        params["type"]="binding";
+        methodRequest("delete", params);
+    }
+
+    void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0)
+    {
+        Variant::Map content;
+        Variant::Map objectId;
+        objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
+        content["_object_id"] = objectId;
+        content["_method_name"] = method;
+        content["_arguments"] = inParams;
+
+        messaging::Message request;
+        request.setReplyTo(replyTo);
+        request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
+        request.getProperties()["qmf.opcode"] = "_method_request";
+        encode(content, request);
+
+        sender.send(request);
+
+        messaging::Message response;
+        if (receiver.fetch(response, messaging::Duration::SECOND*5)) {
+            if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
+                std::string opcode = response.getProperties()["qmf.opcode"];
+                if (opcode == "_method_response") {
+                    if (outParams) {
+                        Variant::Map m;
+                        decode(response, m);
+                        *outParams = m["_arguments"].asMap();
+                    }
+                } else if (opcode == "_exception") {
+                    Variant::Map m;
+                    decode(response, m);
+                    throw Exception(QPID_MSG("Error: " << m["_values"]));
+                } else {
+                    throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode));
+                }
+            } else {
+                throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id="
+                                         << response.getProperties()["x-amqp-0-10.app-id"]));
+            }
+        } else {
+            throw Exception(QPID_MSG("No response received"));
+        }
+    }
+  private:
+    messaging::Address replyTo;
+    messaging::Sender sender;
+    messaging::Receiver receiver;
+};
+
 }} // namespace qpid::tests
 
 #endif  /*!TESTS_MESSAGINGFIXTURE_H*/

Modified: qpid/branches/QPID-2519/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/MessagingSessionTests.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/MessagingSessionTests.cpp Fri Oct 21 14:42:12 2011
@@ -611,6 +611,28 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueu
     fix.admin.deleteQueue("q");
 }
 
+QPID_AUTO_TEST_CASE(testAssertExchangeOption)
+{
+    MessagingFixture fix;
+    std::string a1 = "e; {create:always, assert:always, node:{type:topic, x-declare:{type:direct, arguments:{qpid.msg_sequence:True}}}}";
+    Sender s1 = fix.session.createSender(a1);
+    s1.close();
+    Receiver r1 = fix.session.createReceiver(a1);
+    r1.close();
+
+    std::string a2 = "e; {assert:receiver, node:{type:topic, x-declare:{type:fanout, arguments:{qpid.msg_sequence:True}}}}";
+    Sender s2 = fix.session.createSender(a2);
+    s2.close();
+    BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::AssertionFailed);
+
+    std::string a3 = "e; {assert:sender, node:{x-declare:{arguments:{qpid.msg_sequence:False}}}}";
+    BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::AssertionFailed);
+    Receiver r3 = fix.session.createReceiver(a3);
+    r3.close();
+
+    fix.admin.deleteExchange("e");
+}
+
 QPID_AUTO_TEST_CASE(testGetSender)
 {
     QueueFixture fix;
@@ -890,6 +912,212 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
     BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE));
 }
 
+QPID_AUTO_TEST_CASE(testQmfCreateAndDelete)
+{
+    MessagingFixture fix(Broker::Options(), true/*enable management*/);
+    MethodInvoker control(fix.session);
+    control.createQueue("my-queue");
+    control.createExchange("my-exchange", "topic");
+    control.bind("my-exchange", "my-queue", "subject1");
+
+    Sender sender = fix.session.createSender("my-exchange");
+    Receiver receiver = fix.session.createReceiver("my-queue");
+    Message out;
+    out.setSubject("subject1");
+    out.setContent("one");
+    sender.send(out);
+    Message in;
+    BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5));
+    BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+    control.unbind("my-exchange", "my-queue", "subject1");
+    control.bind("my-exchange", "my-queue", "subject2");
+
+    out.setContent("two");
+    sender.send(out);//should be dropped
+
+    out.setSubject("subject2");
+    out.setContent("three");
+    sender.send(out);//should not be dropped
+
+    BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5));
+    BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+    BOOST_CHECK(!receiver.fetch(in, Duration::IMMEDIATE));
+    sender.close();
+    receiver.close();
+
+    control.deleteExchange("my-exchange");
+    messaging::Session other = fix.connection.createSession();
+    {
+    ScopedSuppressLogging sl;
+    BOOST_CHECK_THROW(other.createSender("my-exchange"), qpid::messaging::NotFound);
+    }
+    control.deleteQueue("my-queue");
+    other = fix.connection.createSession();
+    {
+    ScopedSuppressLogging sl;
+    BOOST_CHECK_THROW(other.createReceiver("my-queue"), qpid::messaging::NotFound);
+    }
+}
+
+QPID_AUTO_TEST_CASE(testRejectAndCredit)
+{
+    //Ensure credit is restored on completing rejected messages
+    QueueFixture fix;
+    Sender sender = fix.session.createSender(fix.queue);
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+
+    const uint count(10);
+    receiver.setCapacity(count);
+    for (uint i = 0; i < count; i++) {
+        sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+    }
+
+    Message in;
+    for (uint i = 0; i < count; ++i) {
+        if (receiver.fetch(in, Duration::SECOND)) {
+            BOOST_CHECK_EQUAL(in.getContent(), (boost::format("Message_%1%") % (i+1)).str());
+            fix.session.reject(in);
+        } else {
+            BOOST_FAIL((boost::format("Message_%1% not received as expected") % (i+1)).str());
+            break;
+        }
+    }
+    //send another batch of messages
+    for (uint i = 0; i < count; i++) {
+        sender.send(Message((boost::format("Message_%1%") % (i+count)).str()));
+    }
+
+    for (uint i = 0; i < count; ++i) {
+        if (receiver.fetch(in, Duration::SECOND)) {
+            BOOST_CHECK_EQUAL(in.getContent(), (boost::format("Message_%1%") % (i+count)).str());
+        } else {
+            BOOST_FAIL((boost::format("Message_%1% not received as expected") % (i+count)).str());
+            break;
+        }
+    }
+    fix.session.acknowledge();
+    receiver.close();
+    sender.close();
+}
+
+QPID_AUTO_TEST_CASE(testTtlForever)
+{
+    QueueFixture fix;
+    Sender sender = fix.session.createSender(fix.queue);
+    Message out("I want to live forever!");
+    out.setTtl(Duration::FOREVER);
+    sender.send(out, true);
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+    Message in = receiver.fetch(Duration::IMMEDIATE);
+    fix.session.acknowledge();
+    BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+    BOOST_CHECK(in.getTtl() == Duration::FOREVER);
+}
+
+QPID_AUTO_TEST_CASE(testExclusiveTopicSubscriber)
+{
+    TopicFixture fix;
+    std::string address = (boost::format("%1%; { link: { name: 'my-subscription', x-declare: { auto-delete: true, exclusive: true }}}") % fix.topic).str();
+    Sender sender = fix.session.createSender(fix.topic);
+    Receiver receiver1 = fix.session.createReceiver(address);
+    {
+        ScopedSuppressLogging sl;
+    try {
+        fix.session.createReceiver(address);
+        fix.session.sync();
+        BOOST_FAIL("Expected exception.");
+    } catch (const MessagingException& /*e*/) {}
+    }
+}
+
+QPID_AUTO_TEST_CASE(testNonExclusiveSubscriber)
+{
+    TopicFixture fix;
+    std::string address = (boost::format("%1%; {node:{type:topic}, link:{name:'my-subscription', x-declare:{auto-delete:true, exclusive:false}}}") % fix.topic).str();
+    Receiver receiver1 = fix.session.createReceiver(address);
+    Receiver receiver2 = fix.session.createReceiver(address);
+    Sender sender = fix.session.createSender(fix.topic);
+    sender.send(Message("one"), true);
+    Message in = receiver1.fetch(Duration::IMMEDIATE);
+    BOOST_CHECK_EQUAL(in.getContent(), std::string("one"));
+    sender.send(Message("two"), true);
+    in = receiver2.fetch(Duration::IMMEDIATE);
+    BOOST_CHECK_EQUAL(in.getContent(), std::string("two"));
+    fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testAcknowledgeUpTo)
+{
+    QueueFixture fix;
+    Sender sender = fix.session.createSender(fix.queue);
+    const uint count(20);
+    for (uint i = 0; i < count; ++i) {
+        sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+    }
+
+    Session other = fix.connection.createSession();
+    Receiver receiver = other.createReceiver(fix.queue);
+    std::vector<Message> messages;
+    for (uint i = 0; i < count; ++i) {
+        Message msg = receiver.fetch();
+        BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str());
+        messages.push_back(msg);
+    }
+    const uint batch = 10;
+    other.acknowledgeUpTo(messages[batch-1]);//acknowledge first 10 messages only
+
+    messages.clear();
+    other.sync();
+    other.close();
+
+    other = fix.connection.createSession();
+    receiver = other.createReceiver(fix.queue);
+    Message msg;
+    for (uint i = 0; i < (count-batch); ++i) {
+        msg = receiver.fetch();
+        BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str());
+    }
+    other.acknowledgeUpTo(msg);
+    other.sync();
+    other.close();
+
+    Message m;
+    //check queue is empty
+    BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE));
+}
+
+QPID_AUTO_TEST_CASE(testCreateBindingsOnStandardExchange)
+{
+    QueueFixture fix;
+    Sender sender = fix.session.createSender((boost::format("amq.direct; {create:always, node:{type:topic, x-bindings:[{queue:%1%, key:my-subject}]}}") % fix.queue).str());
+    Message out("test-message");
+    out.setSubject("my-subject");
+    sender.send(out);
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+    Message in = receiver.fetch(Duration::SECOND * 5);
+    fix.session.acknowledge();
+    BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+    BOOST_CHECK_EQUAL(in.getSubject(), out.getSubject());
+}
+
+QPID_AUTO_TEST_CASE(testUnsubscribeOnClose)
+{
+    MessagingFixture fix;
+    Sender sender = fix.session.createSender("my-exchange/my-subject; {create: always, delete:sender, node:{type:topic, x-declare:{alternate-exchange:amq.fanout}}}");
+    Receiver receiver = fix.session.createReceiver("my-exchange/my-subject");
+    Receiver deadletters = fix.session.createReceiver("amq.fanout");
+
+    sender.send(Message("first"));
+    Message in = receiver.fetch(Duration::SECOND);
+    BOOST_CHECK_EQUAL(in.getContent(), std::string("first"));
+    fix.session.acknowledge();
+    receiver.close();
+    sender.send(Message("second"));
+    in = deadletters.fetch(Duration::SECOND);
+    BOOST_CHECK_EQUAL(in.getContent(), std::string("second"));
+    fix.session.acknowledge();
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/branches/QPID-2519/cpp/src/tests/Qmf2.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/Qmf2.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/Qmf2.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/Qmf2.cpp Fri Oct 21 14:42:12 2011
@@ -23,12 +23,36 @@
 #include "qmf/QueryImpl.h"
 #include "qmf/SchemaImpl.h"
 #include "qmf/exceptions.h"
-
+#include "qpid/messaging/Connection.h"
+#include "qmf/PosixEventNotifierImpl.h"
+#include "qmf/AgentSession.h"
+#include "qmf/AgentSessionImpl.h"
+#include "qmf/ConsoleSession.h"
+#include "qmf/ConsoleSessionImpl.h"
 #include "unit_test.h"
 
+using namespace std;
 using namespace qpid::types;
+using namespace qpid::messaging;
 using namespace qmf;
 
+bool isReadable(int fd)
+{
+    fd_set rfds;
+    struct timeval tv;
+    int nfds, result;
+
+    FD_ZERO(&rfds);
+    FD_SET(fd, &rfds);
+    nfds = fd + 1;
+    tv.tv_sec = 0;
+    tv.tv_usec = 0;
+
+    result = select(nfds, &rfds, NULL, NULL, &tv);
+
+    return result > 0;
+}
+
 namespace qpid {
 namespace tests {
 
@@ -315,6 +339,84 @@ QPID_AUTO_TEST_CASE(testSchema)
     BOOST_CHECK_THROW(method.getArgument(3), QmfException);
 }
 
+QPID_AUTO_TEST_CASE(testAgentSessionEventListener)
+{
+    Connection connection("localhost");
+    AgentSession session(connection, "");
+    posix::EventNotifier notifier(session);
+
+    AgentSessionImpl& sessionImpl = AgentSessionImplAccess::get(session);
+            
+    BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+}
+
+QPID_AUTO_TEST_CASE(testConsoleSessionEventListener)
+{
+    Connection connection("localhost");
+    ConsoleSession session(connection, "");
+    posix::EventNotifier notifier(session);
+
+    ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session);
+
+    BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+}
+
+QPID_AUTO_TEST_CASE(testGetHandle)
+{
+    Connection connection("localhost");
+    ConsoleSession session(connection, "");
+    posix::EventNotifier notifier(session);
+
+    BOOST_CHECK(notifier.getHandle() > 0);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadableToFalse)
+{
+    Connection connection("localhost");
+    ConsoleSession session(connection, "");
+    posix::EventNotifier notifier(session);
+    PosixEventNotifierImplAccess::get(notifier).setReadable(false);
+
+    bool readable(isReadable(notifier.getHandle()));
+    BOOST_CHECK(!readable);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadable)
+{
+    Connection connection("localhost");
+    ConsoleSession session(connection, "");
+    posix::EventNotifier notifier(session);
+    PosixEventNotifierImplAccess::get(notifier).setReadable(true);
+
+    bool readable(isReadable(notifier.getHandle()));
+    BOOST_CHECK(readable);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadableMultiple)
+{
+    Connection connection("localhost");
+    ConsoleSession session(connection, "");
+    posix::EventNotifier notifier(session);
+    for (int i = 0; i < 15; i++)
+        PosixEventNotifierImplAccess::get(notifier).setReadable(true);
+    PosixEventNotifierImplAccess::get(notifier).setReadable(false);
+
+    bool readable(isReadable(notifier.getHandle()));
+    BOOST_CHECK(!readable);
+}
+
+QPID_AUTO_TEST_CASE(testDeleteNotifier)
+{
+    Connection connection("localhost");
+    ConsoleSession session(connection, "");
+    ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session);
+    {
+        posix::EventNotifier notifier(session);
+        BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+    }
+    BOOST_CHECK(sessionImpl.getEventNotifier() == 0);
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/branches/QPID-2519/cpp/src/tests/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/QueueEvents.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/QueueEvents.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/QueueEvents.cpp Fri Oct 21 14:42:12 2011
@@ -147,7 +147,7 @@ struct EventRecorder
 
 QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing)
 {
-    ProxySessionFixture fixture;
+    SessionFixture fixture;
     //register dummy event listener to broker
     EventRecorder listener;
     fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));
@@ -194,7 +194,7 @@ QPID_AUTO_TEST_CASE(testSystemLevelEvent
 
 QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing_enqueuesOnly)
 {
-    ProxySessionFixture fixture;
+    SessionFixture fixture;
     //register dummy event listener to broker
     EventRecorder listener;
     fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));

Modified: qpid/branches/QPID-2519/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/QueuePolicyTest.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/QueuePolicyTest.cpp Fri Oct 21 14:42:12 2011
@@ -23,6 +23,7 @@
 #include "test_tools.h"
 
 #include "qpid/broker/QueuePolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/client/QueueOptions.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -38,6 +39,7 @@ namespace tests {
 
 QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite)
 
+namespace {
 QueuedMessage createMessage(uint32_t size)
 {
     QueuedMessage msg;
@@ -45,7 +47,7 @@ QueuedMessage createMessage(uint32_t siz
     MessageUtils::addContent(msg.payload, std::string (size, 'x'));
     return msg;
 }
-
+}
 
 QPID_AUTO_TEST_CASE(testCount)
 {
@@ -150,7 +152,7 @@ QPID_AUTO_TEST_CASE(testRingPolicyCount)
     std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING);
     policy->update(args);
 
-    ProxySessionFixture f;
+    SessionFixture f;
     std::string q("my-ring-queue");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
     for (int i = 0; i < 10; i++) {
@@ -185,7 +187,7 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
     std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING);
     policy->update(args);
 
-    ProxySessionFixture f;
+    SessionFixture f;
     std::string q("my-ring-queue");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
 
@@ -257,7 +259,7 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy
     std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING_STRICT);
     policy->update(args);
 
-    ProxySessionFixture f;
+    SessionFixture f;
     std::string q("my-ring-queue");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
     LocalQueue incoming;
@@ -283,7 +285,7 @@ QPID_AUTO_TEST_CASE(testPolicyWithDtx)
     std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
     policy->update(args);
 
-    ProxySessionFixture f;
+    SessionFixture f;
     std::string q("my-policy-queue");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
     LocalQueue incoming;
@@ -340,8 +342,10 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNo
     //fallback to rejecting messages
     QueueOptions args;
     args.setSizePolicy(FLOW_TO_DISK, 0, 5);
+    // Disable flow control, or else we'll never hit the max limit
+    args.setInt(QueueFlowLimit::flowStopCountKey, 0);
 
-    ProxySessionFixture f;
+    SessionFixture f;
     std::string q("my-queue");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
     LocalQueue incoming;
@@ -367,7 +371,7 @@ QPID_AUTO_TEST_CASE(testPolicyFailureOnC
     std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
     policy->update(args);
 
-    ProxySessionFixture f;
+    SessionFixture f;
     std::string q("q");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
     f.session.txSelect();
@@ -382,8 +386,9 @@ QPID_AUTO_TEST_CASE(testCapacityConversi
 {
     FieldTable args;
     args.setString("qpid.max_count", "5");
+    args.setString("qpid.flow_stop_count", "0");
 
-    ProxySessionFixture f;
+    SessionFixture f;
     std::string q("q");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
     for (int i = 0; i < 5; i++) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org