You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/11/12 23:34:24 UTC

svn commit: r835612 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/cmsutil/ main/activemq/commands/ main/activemq/core/ main/activemq/exceptions/ main/activemq/util/ test-integration/ test-integration/activemq/test/openwire/ test/act...

Author: tabish
Date: Thu Nov 12 22:34:23 2009
New Revision: 835612

URL: http://svn.apache.org/viewvc?rev=835612&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-240

Adds the support for Individual Ack and fixes some outstanding issues with message Ack in general.  

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CmsAccessor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/PooledSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBlobMessage.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBytesMessage.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQDestination.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMapMessage.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempDestination.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTextMessage.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTopic.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/exceptions/ExceptionDefines.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/CMSExceptionSupport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQMessageTest.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CmsAccessor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CmsAccessor.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CmsAccessor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CmsAccessor.cpp Thu Nov 12 22:34:23 2009
@@ -18,6 +18,7 @@
 #include "CmsAccessor.h"
 #include <activemq/exceptions/ExceptionDefines.h>
 #include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/util/CMSExceptionSupport.h>
 
 using namespace activemq::cmsutil;
 using namespace activemq::exceptions;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/PooledSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/PooledSession.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/PooledSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/PooledSession.cpp Thu Nov 12 22:34:23 2009
@@ -20,6 +20,7 @@
 #include "ResourceLifecycleManager.h"
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/exceptions/ExceptionDefines.h>
+#include <activemq/util/CMSExceptionSupport.h>
 
 using namespace activemq::cmsutil;
 using namespace activemq::exceptions;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBlobMessage.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBlobMessage.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBlobMessage.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBlobMessage.cpp Thu Nov 12 22:34:23 2009
@@ -17,6 +17,8 @@
 
 #include "ActiveMQBlobMessage.h"
 
+#include <activemq/util/CMSExceptionSupport.h>
+
 using namespace std;
 using namespace activemq;
 using namespace activemq::commands;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBytesMessage.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBytesMessage.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBytesMessage.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBytesMessage.cpp Thu Nov 12 22:34:23 2009
@@ -16,6 +16,8 @@
  */
 #include <activemq/commands/ActiveMQBytesMessage.h>
 
+#include <activemq/util/CMSExceptionSupport.h>
+
 using namespace std;
 using namespace activemq;
 using namespace activemq::commands;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQDestination.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQDestination.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQDestination.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQDestination.cpp Thu Nov 12 22:34:23 2009
@@ -17,6 +17,7 @@
 #include <activemq/commands/ActiveMQDestination.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <activemq/util/URISupport.h>
+#include <activemq/util/CMSExceptionSupport.h>
 
 #include <activemq/commands/ActiveMQTopic.h>
 #include <activemq/commands/ActiveMQQueue.h>

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMapMessage.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMapMessage.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMapMessage.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMapMessage.cpp Thu Nov 12 22:34:23 2009
@@ -16,6 +16,7 @@
  */
 #include <activemq/commands/ActiveMQMapMessage.h>
 #include <activemq/wireformat/openwire/marshal/PrimitiveTypesMarshaller.h>
+#include <activemq/util/CMSExceptionSupport.h>
 
 using namespace std;
 using namespace decaf;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h Thu Nov 12 22:34:23 2009
@@ -24,6 +24,7 @@
 #include <activemq/core/ActiveMQAckHandler.h>
 #include <activemq/wireformat/openwire/utils/MessagePropertyInterceptor.h>
 #include <activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.h>
+#include <activemq/util/CMSExceptionSupport.h>
 
 #include <cms/MessageNotReadableException.h>
 #include <cms/MessageNotWriteableException.h>

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempDestination.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempDestination.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempDestination.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempDestination.cpp Thu Nov 12 22:34:23 2009
@@ -18,6 +18,7 @@
 
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/core/ActiveMQConnection.h>
+#include <activemq/util/CMSExceptionSupport.h>
 
 using namespace std;
 using namespace activemq;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.cpp Thu Nov 12 22:34:23 2009
@@ -17,6 +17,7 @@
 #include <activemq/commands/ActiveMQTempQueue.h>
 
 #include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/util/CMSExceptionSupport.h>
 
 using namespace std;
 using namespace activemq;
@@ -71,3 +72,11 @@
 bool ActiveMQTempQueue::equals( const DataStructure* value ) const {
     return ActiveMQDestination::equals( value );
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTempQueue::destroy() throw ( cms::CMSException ) {
+    try{
+        close();
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.h Thu Nov 12 22:34:23 2009
@@ -136,12 +136,7 @@
          * Destroy's the Temp Destination at the Broker
          * @throws CMSException
          */
-        virtual void destroy() throw ( cms::CMSException ) {
-            try{
-                close();
-            }
-            AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
-        }
+        virtual void destroy() throw ( cms::CMSException );
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.cpp Thu Nov 12 22:34:23 2009
@@ -16,6 +16,8 @@
  */
 #include <activemq/commands/ActiveMQTempTopic.h>
 
+#include <activemq/util/CMSExceptionSupport.h>
+
 using namespace std;
 using namespace activemq;
 using namespace activemq::commands;
@@ -69,3 +71,11 @@
 bool ActiveMQTempTopic::equals( const DataStructure* value ) const {
     return ActiveMQDestination::equals( value );
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTempTopic::destroy() throw ( cms::CMSException ) {
+    try{
+        close();
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.h Thu Nov 12 22:34:23 2009
@@ -135,12 +135,7 @@
          * Destroy's the Temp Destination at the Broker
          * @throws CMSException
          */
-        virtual void destroy() throw ( cms::CMSException ) {
-            try{
-                close();
-            }
-            AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
-        }
+        virtual void destroy() throw ( cms::CMSException );
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTextMessage.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTextMessage.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTextMessage.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTextMessage.cpp Thu Nov 12 22:34:23 2009
@@ -20,7 +20,9 @@
 #include <decaf/io/ByteArrayOutputStream.h>
 #include <decaf/io/DataOutputStream.h>
 #include <decaf/io/DataInputStream.h>
+
 #include <activemq/wireformat/openwire/utils/OpenwireStringSupport.h>
+#include <activemq/util/CMSExceptionSupport.h>
 
 using namespace std;
 using namespace activemq;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTopic.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTopic.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTopic.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTopic.cpp Thu Nov 12 22:34:23 2009
@@ -16,6 +16,8 @@
  */
 #include <activemq/commands/ActiveMQTopic.h>
 
+#include <activemq/util/CMSExceptionSupport.h>
+
 using namespace std;
 using namespace activemq;
 using namespace activemq::commands;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp Thu Nov 12 22:34:23 2009
@@ -43,7 +43,6 @@
 ////////////////////////////////////////////////////////////////////////////////
 Message::Message() : BaseCommand() {
 
-    this->ackHandler = NULL;
     this->readOnlyBody = false;
     this->readOnlyProperties = false;
     this->groupID = "";
@@ -236,7 +235,7 @@
     }
     stream << " Value of BrokerInTime = " << this->getBrokerInTime() << std::endl;
     stream << " Value of BrokerOutTime = " << this->getBrokerOutTime() << std::endl;
-    stream << " Value of ackHandler = " << ackHandler << std::endl;
+    stream << " Value of ackHandler = " << ackHandler.get() << std::endl;
     stream << " Value of properties = " << this->properties.toString() << std::endl;
     stream << " Value of readOnlyBody = " << this->readOnlyBody << std::endl;
     stream << " Value of readOnlyProperties = " << this->readOnlyBody << std::endl;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.h Thu Nov 12 22:34:23 2009
@@ -31,6 +31,7 @@
 #include <activemq/commands/MessageId.h>
 #include <activemq/commands/ProducerId.h>
 #include <activemq/commands/TransactionId.h>
+#include <activemq/core/ActiveMQAckHandler.h>
 #include <activemq/util/Config.h>
 #include <activemq/util/PrimitiveMap.h>
 #include <decaf/lang/Pointer.h>
@@ -59,7 +60,7 @@
 
         // Used to allow a client to call Message::acknowledge when in the Client
         // Ack mode.
-        core::ActiveMQAckHandler* ackHandler;
+        Pointer<core::ActiveMQAckHandler> ackHandler;
 
         // Message properties, these are Marshaled and Unmarshaled from the Message
         // Command's marshaledProperties vector.
@@ -189,7 +190,7 @@
          * when the Acknowledge method is called.
          * @param handler ActiveMQAckHandler to call
          */
-        virtual void setAckHandler( core::ActiveMQAckHandler* handler ) {
+        virtual void setAckHandler( const Pointer<core::ActiveMQAckHandler>& handler ) {
             this->ackHandler = handler;
         }
 
@@ -198,7 +199,7 @@
          * when the Acknowledge method is called.
          * @returns handler ActiveMQAckHandler to call or NULL if not set
          */
-        virtual core::ActiveMQAckHandler* getAckHandler() const {
+        virtual Pointer<core::ActiveMQAckHandler> getAckHandler() const {
             return this->ackHandler;
         }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h Thu Nov 12 22:34:23 2009
@@ -19,9 +19,11 @@
 
 #include <cms/CMSException.h>
 #include <activemq/util/Config.h>
-#include <activemq/commands/Message.h>
 
 namespace activemq{
+namespace commands{
+    class Message;
+}
 namespace core{
 
     /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Thu Nov 12 22:34:23 2009
@@ -24,6 +24,7 @@
 #include <activemq/core/ActiveMQConstants.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/exceptions/BrokerException.h>
+#include <activemq/util/CMSExceptionSupport.h>
 
 #include <decaf/lang/Boolean.h>
 #include <decaf/util/Iterator.h>
@@ -138,6 +139,7 @@
         sessionId->setConnectionId( connectionInfo->getConnectionId()->getValue() );
         sessionId->setValue( this->getNextSessionId() );
         sessionInfo->setSessionId( sessionId );
+        sessionInfo->setAckMode( ackMode );
 
         // Send the subscription message to the broker.
         syncRequest( sessionInfo );
@@ -395,7 +397,7 @@
         syncRequest( command );
     }
     AMQ_CATCH_RETHROW( NullPointerException )
-    AMQ_CATCH_RETHROW( IllegalStateException )
+    AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException )
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
@@ -423,7 +425,7 @@
         this->destroyDestination( amqDestination );
     }
     AMQ_CATCH_RETHROW( NullPointerException )
-    AMQ_CATCH_RETHROW( IllegalStateException )
+    AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException )
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Thu Nov 12 22:34:23 2009
@@ -22,6 +22,7 @@
 #include <decaf/lang/Math.h>
 #include <decaf/lang/System.h>
 #include <activemq/util/Config.h>
+#include <activemq/util/CMSExceptionSupport.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/commands/Message.h>
 #include <activemq/commands/MessageAck.h>
@@ -33,6 +34,7 @@
 #include <activemq/core/ActiveMQConstants.h>
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/ActiveMQTransactionContext.h>
+#include <activemq/core/ActiveMQAckHandler.h>
 #include <cms/ExceptionListener.h>
 #include <memory>
 
@@ -50,6 +52,11 @@
 namespace activemq{
 namespace core {
 
+    /**
+     * Class used to deal with consumers in an active transaction.  This
+     * class calls back into the consumer when the transaction is Committed or
+     * Rolled Back to process that event.
+     */
     class TransactionSynhcronization : public Synchronization {
     private:
 
@@ -86,6 +93,11 @@
 
     };
 
+    /**
+     * Class used to Hook a consumer that has been closed into the Transaction
+     * it is currently a part of.  Once the Transaction has been Committed or
+     * Rolled back this Synchronization can finish the Close of the consumer.
+     */
     class CloseSynhcronization : public Synchronization {
     private:
 
@@ -118,6 +130,60 @@
 
     };
 
+    /**
+     * ActiveMQAckHandler used to support Client Acknowledge mode.
+     */
+    class ClientAckHandler : public ActiveMQAckHandler {
+    private:
+
+        ActiveMQSession* session;
+
+    public:
+
+        ClientAckHandler( ActiveMQSession* session ) {
+            this->session = session;
+        }
+
+        void acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED )
+            throw ( cms::CMSException ) {
+
+            try {
+                this->session->acknowledge();
+            }
+            AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+        }
+    };
+
+    /**
+     * ActiveMQAckHandler used to enable the Individual Acknowledge mode.
+     */
+    class IndividualAckHandler : public ActiveMQAckHandler {
+    private:
+
+        Pointer<commands::MessageDispatch> dispatch;
+        ActiveMQConsumer* consumer;
+
+    public:
+
+        IndividualAckHandler( ActiveMQConsumer* consumer, const Pointer<MessageDispatch>& dispatch ) {
+            this->consumer = consumer;
+            this->dispatch = dispatch;
+        }
+
+        void acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED )
+            throw ( cms::CMSException ) {
+
+            try {
+
+                if( this->dispatch != NULL ) {
+                    this->consumer->acknowledge( this->dispatch );
+                    this->dispatch.reset( NULL );
+                }
+            }
+            AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+        }
+    };
+
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -462,42 +528,35 @@
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::beforeMessageIsConsumed( const Pointer<MessageDispatch>& dispatch ) {
 
-    // If the Session is in ClientAcknowledge mode, then we set the
-    // handler in the message to this object and send it out.  Otherwise
-    // we ack it here for all the other Modes.
+    // If the Session is in ClientAcknowledge or IndividualAcknowledge mode, then
+    // we set the handler in the message to this object and send it out.
     if( session->isClientAcknowledge() ) {
-
-        // Register ourself so that we can handle the Message's
-        // acknowledge method.
-        dispatch->getMessage()->setAckHandler( this );
+        Pointer<ActiveMQAckHandler> ackHandler( new ClientAckHandler( this->session ) );
+        dispatch->getMessage()->setAckHandler( ackHandler );
+    } else if( session->isIndividualAcknowledge() ) {
+        Pointer<ActiveMQAckHandler> ackHandler( new IndividualAckHandler( this, dispatch ) );
+        dispatch->getMessage()->setAckHandler( ackHandler );
     }
 
     this->lastDeliveredSequenceId =
         dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
 
-    synchronized( &dispatchedMessages ) {
-        dispatchedMessages.enqueueFront( dispatch );
-    }
-
-    // If the session is transacted then we hand off the message to it to
-    // be stored for later redelivery.  We do need to check and see if we
-    // are approaching the prefetch limit and send an Delivered ack just so
-    // we continue to receive messages, otherwise we'd stall.
-    if( session->isTransacted() ) {
+    if( !isAutoAcknowledgeBatch() ) {
 
-        if( transaction == NULL ) {
-            throw NullPointerException(
-                __FILE__, __LINE__,
-                "In a Transacted Session but no Transaction Context set." );
+        // When not in an Auto
+        synchronized( &dispatchedMessages ) {
+            dispatchedMessages.enqueueFront( dispatch );
         }
 
-        ackLater( dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED );
+        if( this->session->isTransacted() ) {
+            ackLater( dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED );
+        }
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::afterMessageIsConsumed( const Pointer<MessageDispatch>& message,
-                                               bool messageExpired AMQCPP_UNUSED ) {
+                                               bool messageExpired ) {
 
     try{
 
@@ -509,7 +568,9 @@
             ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
         }
 
-        if( session->isAutoAcknowledge() ) {
+        if( session->isTransacted() ) {
+            return;
+        } else if( isAutoAcknowledgeEach() ) {
 
             if( this->deliveringAcks.compareAndSet( false, true ) ) {
 
@@ -528,8 +589,12 @@
                 this->deliveringAcks.set( false );
             }
 
-        } else if( session->isClientAcknowledge() ) {
+        } else if( isAutoAcknowledgeBatch() ) {
+            ackLater( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
+        } else if( session->isClientAcknowledge() || session->isIndividualAcknowledge() ) {
             ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
+        } else {
+            throw IllegalStateException( __FILE__, __LINE__, "Invalid Session State" );
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -547,12 +612,16 @@
 
         if( this->deliveringAcks.compareAndSet( false, true ) ) {
 
-            if( this->session->isAutoAcknowledge() ) {
+            if( isAutoAcknowledgeEach() ) {
 
                 synchronized( &dispatchedMessages ) {
+
                     ack = makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED );
+
                     if( ack != NULL ) {
                         dispatchedMessages.clear();
+                    } else {
+                        ack.swap( pendingAck );
                     }
                 }
 
@@ -608,8 +677,14 @@
 
     if( oldPendingAck == NULL ) {
         pendingAck->setFirstMessageId( pendingAck->getLastMessageId() );
-    } else {
+    } else if ( oldPendingAck->getAckType() == pendingAck->getAckType() ) {
         pendingAck->setFirstMessageId( oldPendingAck->getFirstMessageId() );
+    } else {
+        // old pending ack being superseded by ack of another type, if is is not a delivered
+        // ack and hence important, send it now so it is not lost.
+        if( oldPendingAck->getAckType() != ActiveMQConstants::ACK_TYPE_DELIVERED ) {
+            session->oneway( oldPendingAck );
+        }
     }
 
     if( session->isTransacted() ) {
@@ -619,13 +694,8 @@
     if( ( 0.5 * this->consumerInfo->getPrefetchSize() ) <= ( deliveredCounter - additionalWindowSize ) ) {
         session->oneway( pendingAck );
         pendingAck.reset( NULL );
-        additionalWindowSize = deliveredCounter;
-
-        // When using DUPS ok, we do a real ack.
-        if( ackType == ActiveMQConstants::ACK_TYPE_CONSUMED ) {
-            deliveredCounter = 0;
-            additionalWindowSize = 0;
-        }
+        deliveredCounter = 0;
+        additionalWindowSize = 0;
     }
 }
 
@@ -654,15 +724,39 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED )
+void ActiveMQConsumer::acknowledge( const Pointer<commands::MessageDispatch>& dispatch )
    throw ( cms::CMSException ) {
 
     try{
 
         this->checkClosed();
 
-        if( this->session->isClientAcknowledge() ) {
-            this->acknowledge();
+        if( this->session->isIndividualAcknowledge() ) {
+
+            Pointer<MessageAck> ack( new MessageAck() );
+            ack->setAckType( ActiveMQConstants::ACK_TYPE_CONSUMED );
+            ack->setConsumerId( this->consumerInfo->getConsumerId() );
+            ack->setDestination( this->consumerInfo->getDestination() );
+            ack->setMessageCount( 1 );
+            ack->setLastMessageId( dispatch->getMessage()->getMessageId() );
+            ack->setFirstMessageId( dispatch->getMessage()->getMessageId() );
+
+            session->oneway( ack );
+
+            synchronized( &dispatchedMessages ) {
+                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( this->dispatchedMessages.iterator() );
+                while( iter->hasNext() ) {
+                    if( iter->next() == dispatch ) {
+                        iter->remove();
+                        break;
+                    }
+                }
+            }
+
+        } else {
+            throw IllegalStateException(
+                __FILE__, __LINE__,
+                "Session is not in IndividualAcknowledge mode." );
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -938,3 +1032,14 @@
     // dispatch thread is ready to flush the dispatch list
     clearDispatchList = true;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumer::isAutoAcknowledgeEach() const {
+    return this->session->isAutoAcknowledge() ||
+           ( this->session->isDupsOkAcknowledge() && this->consumerInfo->getDestination()->isQueue() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumer::isAutoAcknowledgeBatch() const {
+    return this->session->isDupsOkAcknowledge() && !this->consumerInfo->getDestination()->isQueue();
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Thu Nov 12 22:34:23 2009
@@ -27,7 +27,6 @@
 #include <activemq/commands/ConsumerInfo.h>
 #include <activemq/commands/MessageAck.h>
 #include <activemq/commands/MessageDispatch.h>
-#include <activemq/core/ActiveMQAckHandler.h>
 #include <activemq/core/ActiveMQTransactionContext.h>
 #include <activemq/core/Dispatcher.h>
 #include <activemq/core/MessageDispatchChannel.h>
@@ -48,7 +47,6 @@
     class ActiveMQSession;
 
     class AMQCPP_API ActiveMQConsumer : public cms::MessageConsumer,
-                                        public ActiveMQAckHandler,
                                         public Dispatcher
     {
     private:
@@ -220,7 +218,7 @@
          * @param message the Message to Acknowledge
          * @throw CMSException
          */
-        virtual void acknowledgeMessage( const commands::Message* message )
+        virtual void acknowledge( const Pointer<commands::MessageDispatch>& dispatch )
             throw ( cms::CMSException );
 
     public:  // Dispatcher Methods
@@ -286,7 +284,7 @@
          * Has this Consumer Transaction Synchronization been added to the transaction
          * @return true if the synchronization has been added.
          */
-        bool issynchronizationRegistered() const {
+        bool isSynchronizationRegistered() const {
             return this->synchronizationRegistered;
         }
 
@@ -305,7 +303,6 @@
          */
         bool iterate();
 
-
         /**
          * Forces this consumer to send all pending acks to the broker.
          */
@@ -394,6 +391,12 @@
         // Create an Ack Message that acks all messages that have been delivered so far.
         Pointer<commands::MessageAck> makeAckForAllDeliveredMessages( int type );
 
+        // Should Acks be sent on each dispatched message
+        bool isAutoAcknowledgeEach() const;
+
+        // Can Acks be batched for less network overhead.
+        bool isAutoAcknowledgeBatch() const;
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp Thu Nov 12 22:34:23 2009
@@ -19,6 +19,7 @@
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/commands/RemoveInfo.h>
+#include <activemq/util/CMSExceptionSupport.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/lang/exceptions/InvalidStateException.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Thu Nov 12 22:34:23 2009
@@ -24,6 +24,7 @@
 #include <activemq/core/ActiveMQProducer.h>
 #include <activemq/core/ActiveMQSessionExecutor.h>
 #include <activemq/util/ActiveMQProperties.h>
+#include <activemq/util/CMSExceptionSupport.h>
 
 #include <activemq/commands/ConsumerInfo.h>
 #include <activemq/commands/DestinationInfo.h>
@@ -237,6 +238,19 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::acknowledge() {
+
+    synchronized( &this->consumers ) {
+        std::vector< ActiveMQConsumer* > consumers = this->consumers.values();
+
+        std::vector< ActiveMQConsumer* >::iterator iter = consumers.begin();
+        for( ; iter != consumers.end(); ++iter ) {
+            (*iter)->acknowledge();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::deliverAcks() {
 
     synchronized( &this->consumers ) {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Thu Nov 12 22:34:23 2009
@@ -162,13 +162,19 @@
         bool isAutoAcknowledge() const {
             return this->ackMode == cms::Session::AUTO_ACKNOWLEDGE;
         }
+
         bool isDupsOkAcknowledge() const {
             return this->ackMode == cms::Session::DUPS_OK_ACKNOWLEDGE;
         }
+
         bool isClientAcknowledge() const {
             return this->ackMode == cms::Session::CLIENT_ACKNOWLEDGE;
         }
 
+        bool isIndividualAcknowledge() const {
+            return this->ackMode == cms::Session::INDIVIDUAL_ACKNOWLEDGE;
+        }
+
         /**
          * Fires the given exception to the exception listener of the connection
          */
@@ -575,6 +581,12 @@
         void doStartTransaction() throw ( exceptions::ActiveMQException );
 
         /**
+         * Request that the Session inform all its consumers to Acknowledge all Message's
+         * that have been received so far.
+         */
+        void acknowledge();
+
+        /**
          * Request that this Session inform all of its consumers to deliver their pending
          * acks.
          */

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/exceptions/ExceptionDefines.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/exceptions/ExceptionDefines.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/exceptions/ExceptionDefines.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/exceptions/ExceptionDefines.h Thu Nov 12 22:34:23 2009
@@ -75,29 +75,5 @@
         ex.setMark( __FILE__, __LINE__ ); \
     }
 
-/**
- * Macro for catching an exception of one type and then re-throwing
- * as a Basic CMSException, good for cases where the method isn't specific
- * about what CMS Exceptions are thrown, bad if you need to throw an
- * exception of MessageNotReadableException for instance.
- */
-#define AMQ_CATCH_ALL_THROW_CMSEXCEPTION() \
-    catch( cms::CMSException& ex ){ \
-        ex.setMark( __FILE__, __LINE__ ); \
-        throw ex; \
-    } catch( activemq::exceptions::ActiveMQException& ex ){ \
-        ex.setMark( __FILE__, __LINE__ ); \
-        throw ex.convertToCMSException(); \
-    } catch( decaf::lang::Exception& ex ){ \
-        ex.setMark( __FILE__, __LINE__ ); \
-        activemq::exceptions::ActiveMQException amqEx( ex ); \
-        throw amqEx.convertToCMSException(); \
-    } catch( std::exception& ex ){ \
-        throw cms::CMSException( ex.what(), NULL ); \
-    } catch(...) { \
-        throw cms::CMSException( "Caught Unknown Exception", NULL ); \
-    }
-
-
 
 #endif /*_ACTIVEMQ_EXCEPTIONS_EXCEPTIONDEFINES_H_*/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/CMSExceptionSupport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/CMSExceptionSupport.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/CMSExceptionSupport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/CMSExceptionSupport.h Thu Nov 12 22:34:23 2009
@@ -21,8 +21,15 @@
 #include <activemq/util/Config.h>
 
 #include <cms/CMSException.h>
+#include <cms/CMSSecurityException.h>
 #include <cms/MessageEOFException.h>
 #include <cms/MessageFormatException.h>
+#include <cms/MessageNotReadableException.h>
+#include <cms/MessageNotWriteableException.h>
+#include <cms/InvalidClientIdException.h>
+#include <cms/InvalidDestinationException.h>
+#include <cms/InvalidSelectorException.h>
+#include <cms/IllegalStateException.h>
 
 #include <decaf/lang/Exception.h>
 
@@ -52,4 +59,54 @@
 
 }}
 
+/**
+* Macro for catching an exception of one type and then re-throwing
+* as a Basic CMSException, good for cases where the method isn't specific
+* about what CMS Exceptions are thrown, bad if you need to throw an
+* exception of MessageNotReadableException for instance.
+*/
+#define AMQ_CATCH_ALL_THROW_CMSEXCEPTION() \
+    catch( cms::CMSSecurityException& ex ){ \
+        ex.setMark( __FILE__, __LINE__ ); \
+        throw ex; \
+    } catch( cms::IllegalStateException& ex ){ \
+        ex.setMark( __FILE__, __LINE__ ); \
+        throw ex; \
+    } catch( cms::InvalidClientIdException& ex ){ \
+        ex.setMark( __FILE__, __LINE__ ); \
+        throw ex; \
+    } catch( cms::InvalidDestinationException& ex ){ \
+        ex.setMark( __FILE__, __LINE__ ); \
+        throw ex; \
+    } catch( cms::InvalidSelectorException& ex ){ \
+        ex.setMark( __FILE__, __LINE__ ); \
+        throw ex; \
+    } catch( cms::MessageEOFException& ex ){ \
+        ex.setMark( __FILE__, __LINE__ ); \
+        throw ex; \
+    } catch( cms::MessageFormatException& ex ){ \
+        ex.setMark( __FILE__, __LINE__ ); \
+        throw ex; \
+    } catch( cms::MessageNotReadableException& ex ){ \
+        ex.setMark( __FILE__, __LINE__ ); \
+        throw ex; \
+    } catch( cms::MessageNotWriteableException& ex ){ \
+        ex.setMark( __FILE__, __LINE__ ); \
+        throw ex; \
+    } catch( cms::CMSException& ex ){ \
+        ex.setMark( __FILE__, __LINE__ ); \
+        throw ex; \
+    } catch( activemq::exceptions::ActiveMQException& ex ){ \
+        ex.setMark( __FILE__, __LINE__ ); \
+        throw ex.convertToCMSException(); \
+    } catch( decaf::lang::Exception& ex ){ \
+        ex.setMark( __FILE__, __LINE__ ); \
+        activemq::exceptions::ActiveMQException amqEx( ex ); \
+        throw amqEx.convertToCMSException(); \
+    } catch( std::exception& ex ){ \
+        throw cms::CMSException( ex.what(), NULL ); \
+    } catch(...) { \
+        throw cms::CMSException( "Caught Unknown Exception", NULL ); \
+    }
+
 #endif /* _ACTIVEMQ_UTIL_CMSEXCEPTIONSUPPORT_H_ */

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am Thu Nov 12 22:34:23 2009
@@ -31,6 +31,7 @@
     activemq/test/openwire/OpenwireCmsTemplateTest.cpp \
     activemq/test/openwire/OpenwireDurableTest.cpp \
     activemq/test/openwire/OpenwireExpirationTest.cpp \
+    activemq/test/openwire/OpenwireIndividualAckTest.cpp \
     activemq/test/openwire/OpenwireJmsMessageGroupsTest.cpp \
     activemq/test/openwire/OpenwireSimpleRollbackTest.cpp \
     activemq/test/openwire/OpenwireSimpleTest.cpp \
@@ -70,6 +71,7 @@
     activemq/test/openwire/OpenwireCmsTemplateTest.h \
     activemq/test/openwire/OpenwireDurableTest.h \
     activemq/test/openwire/OpenwireExpirationTest.h \
+    activemq/test/openwire/OpenwireIndividualAckTest.h \
     activemq/test/openwire/OpenwireJmsMessageGroupsTest.h \
     activemq/test/openwire/OpenwireSimpleRollbackTest.h \
     activemq/test/openwire/OpenwireSimpleTest.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp Thu Nov 12 22:34:23 2009
@@ -19,6 +19,7 @@
 #include "activemq/test/openwire/OpenwireCmsTemplateTest.h"
 #include "activemq/test/openwire/OpenwireDurableTest.h"
 #include "activemq/test/openwire/OpenwireExpirationTest.h"
+#include "activemq/test/openwire/OpenwireIndividualAckTest.h"
 #include "activemq/test/openwire/OpenwireSimpleRollbackTest.h"
 #include "activemq/test/openwire/OpenwireSimpleTest.h"
 #include "activemq/test/openwire/OpenwireTransactionTest.h"
@@ -42,6 +43,7 @@
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireCmsTemplateTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireDurableTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireExpirationTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireIndividualAckTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireTransactionTest );

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp?rev=835612&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp Thu Nov 12 22:34:23 2009
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenwireIndividualAckTest.h"
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+#include <cms/Session.h>
+
+#include <memory>
+
+using namespace cms;
+using namespace std;
+using namespace activemq;
+using namespace activemq::test;
+using namespace activemq::test::openwire;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireIndividualAckTest::OpenwireIndividualAckTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireIndividualAckTest::~OpenwireIndividualAckTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireIndividualAckTest::testAckedMessageAreConsumed() {
+
+    Connection* connection = this->cmsProvider->getConnection();
+    connection->start();
+
+    std::auto_ptr<Session> session( connection->createSession( cms::Session::INDIVIDUAL_ACKNOWLEDGE ) );
+    std::auto_ptr<Destination> queue( session->createTemporaryQueue() );
+    std::auto_ptr<MessageProducer> producer( session->createProducer( queue.get() ) );
+
+    std::auto_ptr<TextMessage> msg1( session->createTextMessage("Hello") );
+    producer->send( msg1.get() );
+
+    // Consume the message...
+    std::auto_ptr<MessageConsumer> consumer( session->createConsumer( queue.get() ) );
+    std::auto_ptr<Message> msg( consumer->receive( 1000 ) );
+    CPPUNIT_ASSERT( msg.get() != NULL );
+    msg->acknowledge();
+
+    // Reset the session.
+    session->close();
+    session.reset( connection->createSession( Session::INDIVIDUAL_ACKNOWLEDGE ) );
+
+    // Attempt to Consume the message...
+    consumer.reset( session->createConsumer( queue.get() ) );
+    msg.reset( consumer->receive( 1000 ) );
+    CPPUNIT_ASSERT( msg.get() == NULL );
+
+    session->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireIndividualAckTest::testLastMessageAcked(){
+
+    Connection* connection = this->cmsProvider->getConnection();
+    connection->start();
+
+    std::auto_ptr<Session> session( connection->createSession( cms::Session::INDIVIDUAL_ACKNOWLEDGE ) );
+    std::auto_ptr<Destination> queue( session->createTemporaryQueue() );
+    std::auto_ptr<MessageProducer> producer( session->createProducer( queue.get() ) );
+
+    std::auto_ptr<TextMessage> msg1( session->createTextMessage("msg1") );
+    std::auto_ptr<TextMessage> msg2( session->createTextMessage("msg2") );
+    std::auto_ptr<TextMessage> msg3( session->createTextMessage("msg3") );
+
+    producer->send( msg1.get() );
+    producer->send( msg2.get() );
+    producer->send( msg3.get() );
+
+    // Consume the message...
+    std::auto_ptr<MessageConsumer> consumer( session->createConsumer( queue.get() ) );
+    std::auto_ptr<Message> msg( consumer->receive( 1000 ) );
+    CPPUNIT_ASSERT( msg.get() != NULL );
+    msg.reset( consumer->receive( 1000 ) );
+    CPPUNIT_ASSERT( msg.get() != NULL );
+    msg.reset( consumer->receive( 1000 ) );
+    CPPUNIT_ASSERT( msg.get() != NULL );
+    msg->acknowledge();
+
+    // Reset the session->
+    session->close();
+    session.reset( connection->createSession( Session::INDIVIDUAL_ACKNOWLEDGE) );
+
+    // Attempt to Consume the message...
+    consumer.reset( session->createConsumer( queue.get() ) );
+    msg.reset( consumer->receive( 1000 ) );
+    CPPUNIT_ASSERT( msg.get() != NULL );
+    CPPUNIT_ASSERT( msg1->getText() == dynamic_cast<TextMessage*>( msg.get() )->getText() );
+    msg.reset( consumer->receive( 1000 ) );
+    CPPUNIT_ASSERT( msg.get() != NULL );
+    CPPUNIT_ASSERT( msg2->getText() == dynamic_cast<TextMessage*>( msg.get() )->getText() );
+    msg.reset( consumer->receive( 1000 ) );
+    CPPUNIT_ASSERT( msg.get() == NULL );
+
+    session->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireIndividualAckTest::testUnAckedMessageAreNotConsumedOnSessionClose() {
+
+    Connection* connection = this->cmsProvider->getConnection();
+    connection->start();
+
+    std::auto_ptr<Session> session( connection->createSession( cms::Session::INDIVIDUAL_ACKNOWLEDGE ) );
+    std::auto_ptr<Destination> queue( session->createTemporaryQueue() );
+    std::auto_ptr<MessageProducer> producer( session->createProducer( queue.get() ) );
+
+    std::auto_ptr<TextMessage> msg1( session->createTextMessage("Hello") );
+    producer->send( msg1.get() );
+
+    // Consume the message...
+    std::auto_ptr<MessageConsumer> consumer( session->createConsumer( queue.get() ) );
+    std::auto_ptr<Message> msg( consumer->receive( 1000 ) );
+    CPPUNIT_ASSERT( msg.get() != NULL );
+    // Don't ack the message.
+
+    // Reset the session->  This should cause the unacknowledged message to be re-delivered.
+    session->close();
+    session.reset( connection->createSession( Session::INDIVIDUAL_ACKNOWLEDGE ) );
+
+    // Attempt to Consume the message...
+    consumer.reset( session->createConsumer( queue.get() ) );
+    msg.reset( consumer->receive( 2000 ) );
+    CPPUNIT_ASSERT( msg.get() != NULL );
+    msg->acknowledge();
+
+    session->close();
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h?rev=835612&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h Thu Nov 12 22:34:23 2009
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIREINDIVIDUALACKTEST_H_
+#define _ACTIVEMQ_TEST_OPENWIRE_OPENWIREINDIVIDUALACKTEST_H_
+
+#include <activemq/test/CMSTestFixture.h>
+#include <activemq/util/IntegrationCommon.h>
+
+namespace activemq {
+namespace test {
+namespace openwire {
+
+    class OpenwireIndividualAckTest : public CMSTestFixture {
+
+        CPPUNIT_TEST_SUITE( OpenwireIndividualAckTest );
+        CPPUNIT_TEST( testAckedMessageAreConsumed );
+        CPPUNIT_TEST( testLastMessageAcked );
+        CPPUNIT_TEST( testUnAckedMessageAreNotConsumedOnSessionClose );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        OpenwireIndividualAckTest();
+        virtual ~OpenwireIndividualAckTest();
+
+        void testAckedMessageAreConsumed();
+        void testLastMessageAcked();
+        void testUnAckedMessageAreNotConsumedOnSessionClose();
+
+        virtual std::string getBrokerURL() const {
+            return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();
+        }
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_TEST_OPENWIRE_OPENWIREINDIVIDUALACKTEST_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQMessageTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQMessageTest.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQMessageTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQMessageTest.cpp Thu Nov 12 22:34:23 2009
@@ -22,20 +22,21 @@
 using namespace std;
 using namespace activemq;
 using namespace activemq::util;
+using namespace activemq::core;
 using namespace activemq::commands;
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQMessageTest::test()
 {
     ActiveMQMessage myMessage;
-    MyAckHandler ackHandler;
+    Pointer<MyAckHandler> ackHandler( new MyAckHandler() );
 
     CPPUNIT_ASSERT( myMessage.getDataStructureType() == ActiveMQMessage::ID_ACTIVEMQMESSAGE );
 
-    myMessage.setAckHandler( &ackHandler );
+    myMessage.setAckHandler( ackHandler );
     myMessage.acknowledge();
 
-    CPPUNIT_ASSERT( ackHandler.wasAcked == true );
+    CPPUNIT_ASSERT( ackHandler->wasAcked == true );
 
     CPPUNIT_ASSERT( myMessage.getPropertyNames().size() == 0 );
     CPPUNIT_ASSERT( myMessage.propertyExists( "something" ) == false );