You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/11/12 23:34:24 UTC
svn commit: r835612 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/activemq/cmsutil/ main/activemq/commands/ main/activemq/core/
main/activemq/exceptions/ main/activemq/util/ test-integration/
test-integration/activemq/test/openwire/ test/act...
Author: tabish
Date: Thu Nov 12 22:34:23 2009
New Revision: 835612
URL: http://svn.apache.org/viewvc?rev=835612&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-240
Adds the support for Individual Ack and fixes some outstanding issues with message Ack in general.
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h (with props)
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CmsAccessor.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/PooledSession.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBlobMessage.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBytesMessage.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQDestination.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMapMessage.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempDestination.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTextMessage.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTopic.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/exceptions/ExceptionDefines.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/CMSExceptionSupport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQMessageTest.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CmsAccessor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CmsAccessor.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CmsAccessor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CmsAccessor.cpp Thu Nov 12 22:34:23 2009
@@ -18,6 +18,7 @@
#include "CmsAccessor.h"
#include <activemq/exceptions/ExceptionDefines.h>
#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/util/CMSExceptionSupport.h>
using namespace activemq::cmsutil;
using namespace activemq::exceptions;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/PooledSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/PooledSession.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/PooledSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/PooledSession.cpp Thu Nov 12 22:34:23 2009
@@ -20,6 +20,7 @@
#include "ResourceLifecycleManager.h"
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/exceptions/ExceptionDefines.h>
+#include <activemq/util/CMSExceptionSupport.h>
using namespace activemq::cmsutil;
using namespace activemq::exceptions;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBlobMessage.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBlobMessage.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBlobMessage.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBlobMessage.cpp Thu Nov 12 22:34:23 2009
@@ -17,6 +17,8 @@
#include "ActiveMQBlobMessage.h"
+#include <activemq/util/CMSExceptionSupport.h>
+
using namespace std;
using namespace activemq;
using namespace activemq::commands;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBytesMessage.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBytesMessage.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBytesMessage.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQBytesMessage.cpp Thu Nov 12 22:34:23 2009
@@ -16,6 +16,8 @@
*/
#include <activemq/commands/ActiveMQBytesMessage.h>
+#include <activemq/util/CMSExceptionSupport.h>
+
using namespace std;
using namespace activemq;
using namespace activemq::commands;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQDestination.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQDestination.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQDestination.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQDestination.cpp Thu Nov 12 22:34:23 2009
@@ -17,6 +17,7 @@
#include <activemq/commands/ActiveMQDestination.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <activemq/util/URISupport.h>
+#include <activemq/util/CMSExceptionSupport.h>
#include <activemq/commands/ActiveMQTopic.h>
#include <activemq/commands/ActiveMQQueue.h>
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMapMessage.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMapMessage.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMapMessage.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMapMessage.cpp Thu Nov 12 22:34:23 2009
@@ -16,6 +16,7 @@
*/
#include <activemq/commands/ActiveMQMapMessage.h>
#include <activemq/wireformat/openwire/marshal/PrimitiveTypesMarshaller.h>
+#include <activemq/util/CMSExceptionSupport.h>
using namespace std;
using namespace decaf;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h Thu Nov 12 22:34:23 2009
@@ -24,6 +24,7 @@
#include <activemq/core/ActiveMQAckHandler.h>
#include <activemq/wireformat/openwire/utils/MessagePropertyInterceptor.h>
#include <activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.h>
+#include <activemq/util/CMSExceptionSupport.h>
#include <cms/MessageNotReadableException.h>
#include <cms/MessageNotWriteableException.h>
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempDestination.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempDestination.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempDestination.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempDestination.cpp Thu Nov 12 22:34:23 2009
@@ -18,6 +18,7 @@
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/util/CMSExceptionSupport.h>
using namespace std;
using namespace activemq;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.cpp Thu Nov 12 22:34:23 2009
@@ -17,6 +17,7 @@
#include <activemq/commands/ActiveMQTempQueue.h>
#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/util/CMSExceptionSupport.h>
using namespace std;
using namespace activemq;
@@ -71,3 +72,11 @@
bool ActiveMQTempQueue::equals( const DataStructure* value ) const {
return ActiveMQDestination::equals( value );
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTempQueue::destroy() throw ( cms::CMSException ) {
+ try{
+ close();
+ }
+ AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempQueue.h Thu Nov 12 22:34:23 2009
@@ -136,12 +136,7 @@
* Destroy's the Temp Destination at the Broker
* @throws CMSException
*/
- virtual void destroy() throw ( cms::CMSException ) {
- try{
- close();
- }
- AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
- }
+ virtual void destroy() throw ( cms::CMSException );
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.cpp Thu Nov 12 22:34:23 2009
@@ -16,6 +16,8 @@
*/
#include <activemq/commands/ActiveMQTempTopic.h>
+#include <activemq/util/CMSExceptionSupport.h>
+
using namespace std;
using namespace activemq;
using namespace activemq::commands;
@@ -69,3 +71,11 @@
bool ActiveMQTempTopic::equals( const DataStructure* value ) const {
return ActiveMQDestination::equals( value );
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTempTopic::destroy() throw ( cms::CMSException ) {
+ try{
+ close();
+ }
+ AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTempTopic.h Thu Nov 12 22:34:23 2009
@@ -135,12 +135,7 @@
* Destroy's the Temp Destination at the Broker
* @throws CMSException
*/
- virtual void destroy() throw ( cms::CMSException ) {
- try{
- close();
- }
- AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
- }
+ virtual void destroy() throw ( cms::CMSException );
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTextMessage.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTextMessage.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTextMessage.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTextMessage.cpp Thu Nov 12 22:34:23 2009
@@ -20,7 +20,9 @@
#include <decaf/io/ByteArrayOutputStream.h>
#include <decaf/io/DataOutputStream.h>
#include <decaf/io/DataInputStream.h>
+
#include <activemq/wireformat/openwire/utils/OpenwireStringSupport.h>
+#include <activemq/util/CMSExceptionSupport.h>
using namespace std;
using namespace activemq;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTopic.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTopic.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTopic.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQTopic.cpp Thu Nov 12 22:34:23 2009
@@ -16,6 +16,8 @@
*/
#include <activemq/commands/ActiveMQTopic.h>
+#include <activemq/util/CMSExceptionSupport.h>
+
using namespace std;
using namespace activemq;
using namespace activemq::commands;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp Thu Nov 12 22:34:23 2009
@@ -43,7 +43,6 @@
////////////////////////////////////////////////////////////////////////////////
Message::Message() : BaseCommand() {
- this->ackHandler = NULL;
this->readOnlyBody = false;
this->readOnlyProperties = false;
this->groupID = "";
@@ -236,7 +235,7 @@
}
stream << " Value of BrokerInTime = " << this->getBrokerInTime() << std::endl;
stream << " Value of BrokerOutTime = " << this->getBrokerOutTime() << std::endl;
- stream << " Value of ackHandler = " << ackHandler << std::endl;
+ stream << " Value of ackHandler = " << ackHandler.get() << std::endl;
stream << " Value of properties = " << this->properties.toString() << std::endl;
stream << " Value of readOnlyBody = " << this->readOnlyBody << std::endl;
stream << " Value of readOnlyProperties = " << this->readOnlyBody << std::endl;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.h Thu Nov 12 22:34:23 2009
@@ -31,6 +31,7 @@
#include <activemq/commands/MessageId.h>
#include <activemq/commands/ProducerId.h>
#include <activemq/commands/TransactionId.h>
+#include <activemq/core/ActiveMQAckHandler.h>
#include <activemq/util/Config.h>
#include <activemq/util/PrimitiveMap.h>
#include <decaf/lang/Pointer.h>
@@ -59,7 +60,7 @@
// Used to allow a client to call Message::acknowledge when in the Client
// Ack mode.
- core::ActiveMQAckHandler* ackHandler;
+ Pointer<core::ActiveMQAckHandler> ackHandler;
// Message properties, these are Marshaled and Unmarshaled from the Message
// Command's marshaledProperties vector.
@@ -189,7 +190,7 @@
* when the Acknowledge method is called.
* @param handler ActiveMQAckHandler to call
*/
- virtual void setAckHandler( core::ActiveMQAckHandler* handler ) {
+ virtual void setAckHandler( const Pointer<core::ActiveMQAckHandler>& handler ) {
this->ackHandler = handler;
}
@@ -198,7 +199,7 @@
* when the Acknowledge method is called.
* @returns handler ActiveMQAckHandler to call or NULL if not set
*/
- virtual core::ActiveMQAckHandler* getAckHandler() const {
+ virtual Pointer<core::ActiveMQAckHandler> getAckHandler() const {
return this->ackHandler;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h Thu Nov 12 22:34:23 2009
@@ -19,9 +19,11 @@
#include <cms/CMSException.h>
#include <activemq/util/Config.h>
-#include <activemq/commands/Message.h>
namespace activemq{
+namespace commands{
+ class Message;
+}
namespace core{
/**
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Thu Nov 12 22:34:23 2009
@@ -24,6 +24,7 @@
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/exceptions/BrokerException.h>
+#include <activemq/util/CMSExceptionSupport.h>
#include <decaf/lang/Boolean.h>
#include <decaf/util/Iterator.h>
@@ -138,6 +139,7 @@
sessionId->setConnectionId( connectionInfo->getConnectionId()->getValue() );
sessionId->setValue( this->getNextSessionId() );
sessionInfo->setSessionId( sessionId );
+ sessionInfo->setAckMode( ackMode );
// Send the subscription message to the broker.
syncRequest( sessionInfo );
@@ -395,7 +397,7 @@
syncRequest( command );
}
AMQ_CATCH_RETHROW( NullPointerException )
- AMQ_CATCH_RETHROW( IllegalStateException )
+ AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException )
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -423,7 +425,7 @@
this->destroyDestination( amqDestination );
}
AMQ_CATCH_RETHROW( NullPointerException )
- AMQ_CATCH_RETHROW( IllegalStateException )
+ AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException )
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Thu Nov 12 22:34:23 2009
@@ -22,6 +22,7 @@
#include <decaf/lang/Math.h>
#include <decaf/lang/System.h>
#include <activemq/util/Config.h>
+#include <activemq/util/CMSExceptionSupport.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/commands/Message.h>
#include <activemq/commands/MessageAck.h>
@@ -33,6 +34,7 @@
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/ActiveMQTransactionContext.h>
+#include <activemq/core/ActiveMQAckHandler.h>
#include <cms/ExceptionListener.h>
#include <memory>
@@ -50,6 +52,11 @@
namespace activemq{
namespace core {
+ /**
+ * Class used to deal with consumers in an active transaction. This
+ * class calls back into the consumer when the transaction is Committed or
+ * Rolled Back to process that event.
+ */
class TransactionSynhcronization : public Synchronization {
private:
@@ -86,6 +93,11 @@
};
+ /**
+ * Class used to Hook a consumer that has been closed into the Transaction
+ * it is currently a part of. Once the Transaction has been Committed or
+ * Rolled back this Synchronization can finish the Close of the consumer.
+ */
class CloseSynhcronization : public Synchronization {
private:
@@ -118,6 +130,60 @@
};
+ /**
+ * ActiveMQAckHandler used to support Client Acknowledge mode.
+ */
+ class ClientAckHandler : public ActiveMQAckHandler {
+ private:
+
+ ActiveMQSession* session;
+
+ public:
+
+ ClientAckHandler( ActiveMQSession* session ) {
+ this->session = session;
+ }
+
+ void acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED )
+ throw ( cms::CMSException ) {
+
+ try {
+ this->session->acknowledge();
+ }
+ AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+ }
+ };
+
+ /**
+ * ActiveMQAckHandler used to enable the Individual Acknowledge mode.
+ */
+ class IndividualAckHandler : public ActiveMQAckHandler {
+ private:
+
+ Pointer<commands::MessageDispatch> dispatch;
+ ActiveMQConsumer* consumer;
+
+ public:
+
+ IndividualAckHandler( ActiveMQConsumer* consumer, const Pointer<MessageDispatch>& dispatch ) {
+ this->consumer = consumer;
+ this->dispatch = dispatch;
+ }
+
+ void acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED )
+ throw ( cms::CMSException ) {
+
+ try {
+
+ if( this->dispatch != NULL ) {
+ this->consumer->acknowledge( this->dispatch );
+ this->dispatch.reset( NULL );
+ }
+ }
+ AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+ }
+ };
+
}}
////////////////////////////////////////////////////////////////////////////////
@@ -462,42 +528,35 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::beforeMessageIsConsumed( const Pointer<MessageDispatch>& dispatch ) {
- // If the Session is in ClientAcknowledge mode, then we set the
- // handler in the message to this object and send it out. Otherwise
- // we ack it here for all the other Modes.
+ // If the Session is in ClientAcknowledge or IndividualAcknowledge mode, then
+ // we set the handler in the message to this object and send it out.
if( session->isClientAcknowledge() ) {
-
- // Register ourself so that we can handle the Message's
- // acknowledge method.
- dispatch->getMessage()->setAckHandler( this );
+ Pointer<ActiveMQAckHandler> ackHandler( new ClientAckHandler( this->session ) );
+ dispatch->getMessage()->setAckHandler( ackHandler );
+ } else if( session->isIndividualAcknowledge() ) {
+ Pointer<ActiveMQAckHandler> ackHandler( new IndividualAckHandler( this, dispatch ) );
+ dispatch->getMessage()->setAckHandler( ackHandler );
}
this->lastDeliveredSequenceId =
dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
- synchronized( &dispatchedMessages ) {
- dispatchedMessages.enqueueFront( dispatch );
- }
-
- // If the session is transacted then we hand off the message to it to
- // be stored for later redelivery. We do need to check and see if we
- // are approaching the prefetch limit and send an Delivered ack just so
- // we continue to receive messages, otherwise we'd stall.
- if( session->isTransacted() ) {
+ if( !isAutoAcknowledgeBatch() ) {
- if( transaction == NULL ) {
- throw NullPointerException(
- __FILE__, __LINE__,
- "In a Transacted Session but no Transaction Context set." );
+ // When not in an Auto
+ synchronized( &dispatchedMessages ) {
+ dispatchedMessages.enqueueFront( dispatch );
}
- ackLater( dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED );
+ if( this->session->isTransacted() ) {
+ ackLater( dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED );
+ }
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::afterMessageIsConsumed( const Pointer<MessageDispatch>& message,
- bool messageExpired AMQCPP_UNUSED ) {
+ bool messageExpired ) {
try{
@@ -509,7 +568,9 @@
ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
}
- if( session->isAutoAcknowledge() ) {
+ if( session->isTransacted() ) {
+ return;
+ } else if( isAutoAcknowledgeEach() ) {
if( this->deliveringAcks.compareAndSet( false, true ) ) {
@@ -528,8 +589,12 @@
this->deliveringAcks.set( false );
}
- } else if( session->isClientAcknowledge() ) {
+ } else if( isAutoAcknowledgeBatch() ) {
+ ackLater( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
+ } else if( session->isClientAcknowledge() || session->isIndividualAcknowledge() ) {
ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
+ } else {
+ throw IllegalStateException( __FILE__, __LINE__, "Invalid Session State" );
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -547,12 +612,16 @@
if( this->deliveringAcks.compareAndSet( false, true ) ) {
- if( this->session->isAutoAcknowledge() ) {
+ if( isAutoAcknowledgeEach() ) {
synchronized( &dispatchedMessages ) {
+
ack = makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED );
+
if( ack != NULL ) {
dispatchedMessages.clear();
+ } else {
+ ack.swap( pendingAck );
}
}
@@ -608,8 +677,14 @@
if( oldPendingAck == NULL ) {
pendingAck->setFirstMessageId( pendingAck->getLastMessageId() );
- } else {
+ } else if ( oldPendingAck->getAckType() == pendingAck->getAckType() ) {
pendingAck->setFirstMessageId( oldPendingAck->getFirstMessageId() );
+ } else {
+ // old pending ack being superseded by ack of another type, if is is not a delivered
+ // ack and hence important, send it now so it is not lost.
+ if( oldPendingAck->getAckType() != ActiveMQConstants::ACK_TYPE_DELIVERED ) {
+ session->oneway( oldPendingAck );
+ }
}
if( session->isTransacted() ) {
@@ -619,13 +694,8 @@
if( ( 0.5 * this->consumerInfo->getPrefetchSize() ) <= ( deliveredCounter - additionalWindowSize ) ) {
session->oneway( pendingAck );
pendingAck.reset( NULL );
- additionalWindowSize = deliveredCounter;
-
- // When using DUPS ok, we do a real ack.
- if( ackType == ActiveMQConstants::ACK_TYPE_CONSUMED ) {
- deliveredCounter = 0;
- additionalWindowSize = 0;
- }
+ deliveredCounter = 0;
+ additionalWindowSize = 0;
}
}
@@ -654,15 +724,39 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED )
+void ActiveMQConsumer::acknowledge( const Pointer<commands::MessageDispatch>& dispatch )
throw ( cms::CMSException ) {
try{
this->checkClosed();
- if( this->session->isClientAcknowledge() ) {
- this->acknowledge();
+ if( this->session->isIndividualAcknowledge() ) {
+
+ Pointer<MessageAck> ack( new MessageAck() );
+ ack->setAckType( ActiveMQConstants::ACK_TYPE_CONSUMED );
+ ack->setConsumerId( this->consumerInfo->getConsumerId() );
+ ack->setDestination( this->consumerInfo->getDestination() );
+ ack->setMessageCount( 1 );
+ ack->setLastMessageId( dispatch->getMessage()->getMessageId() );
+ ack->setFirstMessageId( dispatch->getMessage()->getMessageId() );
+
+ session->oneway( ack );
+
+ synchronized( &dispatchedMessages ) {
+ std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( this->dispatchedMessages.iterator() );
+ while( iter->hasNext() ) {
+ if( iter->next() == dispatch ) {
+ iter->remove();
+ break;
+ }
+ }
+ }
+
+ } else {
+ throw IllegalStateException(
+ __FILE__, __LINE__,
+ "Session is not in IndividualAcknowledge mode." );
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -938,3 +1032,14 @@
// dispatch thread is ready to flush the dispatch list
clearDispatchList = true;
}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumer::isAutoAcknowledgeEach() const {
+ return this->session->isAutoAcknowledge() ||
+ ( this->session->isDupsOkAcknowledge() && this->consumerInfo->getDestination()->isQueue() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumer::isAutoAcknowledgeBatch() const {
+ return this->session->isDupsOkAcknowledge() && !this->consumerInfo->getDestination()->isQueue();
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Thu Nov 12 22:34:23 2009
@@ -27,7 +27,6 @@
#include <activemq/commands/ConsumerInfo.h>
#include <activemq/commands/MessageAck.h>
#include <activemq/commands/MessageDispatch.h>
-#include <activemq/core/ActiveMQAckHandler.h>
#include <activemq/core/ActiveMQTransactionContext.h>
#include <activemq/core/Dispatcher.h>
#include <activemq/core/MessageDispatchChannel.h>
@@ -48,7 +47,6 @@
class ActiveMQSession;
class AMQCPP_API ActiveMQConsumer : public cms::MessageConsumer,
- public ActiveMQAckHandler,
public Dispatcher
{
private:
@@ -220,7 +218,7 @@
* @param message the Message to Acknowledge
* @throw CMSException
*/
- virtual void acknowledgeMessage( const commands::Message* message )
+ virtual void acknowledge( const Pointer<commands::MessageDispatch>& dispatch )
throw ( cms::CMSException );
public: // Dispatcher Methods
@@ -286,7 +284,7 @@
* Has this Consumer Transaction Synchronization been added to the transaction
* @return true if the synchronization has been added.
*/
- bool issynchronizationRegistered() const {
+ bool isSynchronizationRegistered() const {
return this->synchronizationRegistered;
}
@@ -305,7 +303,6 @@
*/
bool iterate();
-
/**
* Forces this consumer to send all pending acks to the broker.
*/
@@ -394,6 +391,12 @@
// Create an Ack Message that acks all messages that have been delivered so far.
Pointer<commands::MessageAck> makeAckForAllDeliveredMessages( int type );
+ // Should Acks be sent on each dispatched message
+ bool isAutoAcknowledgeEach() const;
+
+ // Can Acks be batched for less network overhead.
+ bool isAutoAcknowledgeBatch() const;
+
};
}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp Thu Nov 12 22:34:23 2009
@@ -19,6 +19,7 @@
#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/commands/RemoveInfo.h>
+#include <activemq/util/CMSExceptionSupport.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/exceptions/InvalidStateException.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Thu Nov 12 22:34:23 2009
@@ -24,6 +24,7 @@
#include <activemq/core/ActiveMQProducer.h>
#include <activemq/core/ActiveMQSessionExecutor.h>
#include <activemq/util/ActiveMQProperties.h>
+#include <activemq/util/CMSExceptionSupport.h>
#include <activemq/commands/ConsumerInfo.h>
#include <activemq/commands/DestinationInfo.h>
@@ -237,6 +238,19 @@
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::acknowledge() {
+
+ synchronized( &this->consumers ) {
+ std::vector< ActiveMQConsumer* > consumers = this->consumers.values();
+
+ std::vector< ActiveMQConsumer* >::iterator iter = consumers.begin();
+ for( ; iter != consumers.end(); ++iter ) {
+ (*iter)->acknowledge();
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::deliverAcks() {
synchronized( &this->consumers ) {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Thu Nov 12 22:34:23 2009
@@ -162,13 +162,19 @@
bool isAutoAcknowledge() const {
return this->ackMode == cms::Session::AUTO_ACKNOWLEDGE;
}
+
bool isDupsOkAcknowledge() const {
return this->ackMode == cms::Session::DUPS_OK_ACKNOWLEDGE;
}
+
bool isClientAcknowledge() const {
return this->ackMode == cms::Session::CLIENT_ACKNOWLEDGE;
}
+ bool isIndividualAcknowledge() const {
+ return this->ackMode == cms::Session::INDIVIDUAL_ACKNOWLEDGE;
+ }
+
/**
* Fires the given exception to the exception listener of the connection
*/
@@ -575,6 +581,12 @@
void doStartTransaction() throw ( exceptions::ActiveMQException );
/**
+ * Request that the Session inform all its consumers to Acknowledge all Message's
+ * that have been received so far.
+ */
+ void acknowledge();
+
+ /**
* Request that this Session inform all of its consumers to deliver their pending
* acks.
*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/exceptions/ExceptionDefines.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/exceptions/ExceptionDefines.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/exceptions/ExceptionDefines.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/exceptions/ExceptionDefines.h Thu Nov 12 22:34:23 2009
@@ -75,29 +75,5 @@
ex.setMark( __FILE__, __LINE__ ); \
}
-/**
- * Macro for catching an exception of one type and then re-throwing
- * as a Basic CMSException, good for cases where the method isn't specific
- * about what CMS Exceptions are thrown, bad if you need to throw an
- * exception of MessageNotReadableException for instance.
- */
-#define AMQ_CATCH_ALL_THROW_CMSEXCEPTION() \
- catch( cms::CMSException& ex ){ \
- ex.setMark( __FILE__, __LINE__ ); \
- throw ex; \
- } catch( activemq::exceptions::ActiveMQException& ex ){ \
- ex.setMark( __FILE__, __LINE__ ); \
- throw ex.convertToCMSException(); \
- } catch( decaf::lang::Exception& ex ){ \
- ex.setMark( __FILE__, __LINE__ ); \
- activemq::exceptions::ActiveMQException amqEx( ex ); \
- throw amqEx.convertToCMSException(); \
- } catch( std::exception& ex ){ \
- throw cms::CMSException( ex.what(), NULL ); \
- } catch(...) { \
- throw cms::CMSException( "Caught Unknown Exception", NULL ); \
- }
-
-
#endif /*_ACTIVEMQ_EXCEPTIONS_EXCEPTIONDEFINES_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/CMSExceptionSupport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/CMSExceptionSupport.h?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/CMSExceptionSupport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/CMSExceptionSupport.h Thu Nov 12 22:34:23 2009
@@ -21,8 +21,15 @@
#include <activemq/util/Config.h>
#include <cms/CMSException.h>
+#include <cms/CMSSecurityException.h>
#include <cms/MessageEOFException.h>
#include <cms/MessageFormatException.h>
+#include <cms/MessageNotReadableException.h>
+#include <cms/MessageNotWriteableException.h>
+#include <cms/InvalidClientIdException.h>
+#include <cms/InvalidDestinationException.h>
+#include <cms/InvalidSelectorException.h>
+#include <cms/IllegalStateException.h>
#include <decaf/lang/Exception.h>
@@ -52,4 +59,54 @@
}}
+/**
+* Macro for catching an exception of one type and then re-throwing
+* as a Basic CMSException, good for cases where the method isn't specific
+* about what CMS Exceptions are thrown, bad if you need to throw an
+* exception of MessageNotReadableException for instance.
+*/
+#define AMQ_CATCH_ALL_THROW_CMSEXCEPTION() \
+ catch( cms::CMSSecurityException& ex ){ \
+ ex.setMark( __FILE__, __LINE__ ); \
+ throw ex; \
+ } catch( cms::IllegalStateException& ex ){ \
+ ex.setMark( __FILE__, __LINE__ ); \
+ throw ex; \
+ } catch( cms::InvalidClientIdException& ex ){ \
+ ex.setMark( __FILE__, __LINE__ ); \
+ throw ex; \
+ } catch( cms::InvalidDestinationException& ex ){ \
+ ex.setMark( __FILE__, __LINE__ ); \
+ throw ex; \
+ } catch( cms::InvalidSelectorException& ex ){ \
+ ex.setMark( __FILE__, __LINE__ ); \
+ throw ex; \
+ } catch( cms::MessageEOFException& ex ){ \
+ ex.setMark( __FILE__, __LINE__ ); \
+ throw ex; \
+ } catch( cms::MessageFormatException& ex ){ \
+ ex.setMark( __FILE__, __LINE__ ); \
+ throw ex; \
+ } catch( cms::MessageNotReadableException& ex ){ \
+ ex.setMark( __FILE__, __LINE__ ); \
+ throw ex; \
+ } catch( cms::MessageNotWriteableException& ex ){ \
+ ex.setMark( __FILE__, __LINE__ ); \
+ throw ex; \
+ } catch( cms::CMSException& ex ){ \
+ ex.setMark( __FILE__, __LINE__ ); \
+ throw ex; \
+ } catch( activemq::exceptions::ActiveMQException& ex ){ \
+ ex.setMark( __FILE__, __LINE__ ); \
+ throw ex.convertToCMSException(); \
+ } catch( decaf::lang::Exception& ex ){ \
+ ex.setMark( __FILE__, __LINE__ ); \
+ activemq::exceptions::ActiveMQException amqEx( ex ); \
+ throw amqEx.convertToCMSException(); \
+ } catch( std::exception& ex ){ \
+ throw cms::CMSException( ex.what(), NULL ); \
+ } catch(...) { \
+ throw cms::CMSException( "Caught Unknown Exception", NULL ); \
+ }
+
#endif /* _ACTIVEMQ_UTIL_CMSEXCEPTIONSUPPORT_H_ */
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am Thu Nov 12 22:34:23 2009
@@ -31,6 +31,7 @@
activemq/test/openwire/OpenwireCmsTemplateTest.cpp \
activemq/test/openwire/OpenwireDurableTest.cpp \
activemq/test/openwire/OpenwireExpirationTest.cpp \
+ activemq/test/openwire/OpenwireIndividualAckTest.cpp \
activemq/test/openwire/OpenwireJmsMessageGroupsTest.cpp \
activemq/test/openwire/OpenwireSimpleRollbackTest.cpp \
activemq/test/openwire/OpenwireSimpleTest.cpp \
@@ -70,6 +71,7 @@
activemq/test/openwire/OpenwireCmsTemplateTest.h \
activemq/test/openwire/OpenwireDurableTest.h \
activemq/test/openwire/OpenwireExpirationTest.h \
+ activemq/test/openwire/OpenwireIndividualAckTest.h \
activemq/test/openwire/OpenwireJmsMessageGroupsTest.h \
activemq/test/openwire/OpenwireSimpleRollbackTest.h \
activemq/test/openwire/OpenwireSimpleTest.h \
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp Thu Nov 12 22:34:23 2009
@@ -19,6 +19,7 @@
#include "activemq/test/openwire/OpenwireCmsTemplateTest.h"
#include "activemq/test/openwire/OpenwireDurableTest.h"
#include "activemq/test/openwire/OpenwireExpirationTest.h"
+#include "activemq/test/openwire/OpenwireIndividualAckTest.h"
#include "activemq/test/openwire/OpenwireSimpleRollbackTest.h"
#include "activemq/test/openwire/OpenwireSimpleTest.h"
#include "activemq/test/openwire/OpenwireTransactionTest.h"
@@ -42,6 +43,7 @@
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireCmsTemplateTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireDurableTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireExpirationTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireIndividualAckTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireTransactionTest );
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp?rev=835612&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp Thu Nov 12 22:34:23 2009
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenwireIndividualAckTest.h"
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+#include <cms/Session.h>
+
+#include <memory>
+
+using namespace cms;
+using namespace std;
+using namespace activemq;
+using namespace activemq::test;
+using namespace activemq::test::openwire;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireIndividualAckTest::OpenwireIndividualAckTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireIndividualAckTest::~OpenwireIndividualAckTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireIndividualAckTest::testAckedMessageAreConsumed() {
+
+ Connection* connection = this->cmsProvider->getConnection();
+ connection->start();
+
+ std::auto_ptr<Session> session( connection->createSession( cms::Session::INDIVIDUAL_ACKNOWLEDGE ) );
+ std::auto_ptr<Destination> queue( session->createTemporaryQueue() );
+ std::auto_ptr<MessageProducer> producer( session->createProducer( queue.get() ) );
+
+ std::auto_ptr<TextMessage> msg1( session->createTextMessage("Hello") );
+ producer->send( msg1.get() );
+
+ // Consume the message...
+ std::auto_ptr<MessageConsumer> consumer( session->createConsumer( queue.get() ) );
+ std::auto_ptr<Message> msg( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( msg.get() != NULL );
+ msg->acknowledge();
+
+ // Reset the session.
+ session->close();
+ session.reset( connection->createSession( Session::INDIVIDUAL_ACKNOWLEDGE ) );
+
+ // Attempt to Consume the message...
+ consumer.reset( session->createConsumer( queue.get() ) );
+ msg.reset( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( msg.get() == NULL );
+
+ session->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireIndividualAckTest::testLastMessageAcked(){
+
+ Connection* connection = this->cmsProvider->getConnection();
+ connection->start();
+
+ std::auto_ptr<Session> session( connection->createSession( cms::Session::INDIVIDUAL_ACKNOWLEDGE ) );
+ std::auto_ptr<Destination> queue( session->createTemporaryQueue() );
+ std::auto_ptr<MessageProducer> producer( session->createProducer( queue.get() ) );
+
+ std::auto_ptr<TextMessage> msg1( session->createTextMessage("msg1") );
+ std::auto_ptr<TextMessage> msg2( session->createTextMessage("msg2") );
+ std::auto_ptr<TextMessage> msg3( session->createTextMessage("msg3") );
+
+ producer->send( msg1.get() );
+ producer->send( msg2.get() );
+ producer->send( msg3.get() );
+
+ // Consume the message...
+ std::auto_ptr<MessageConsumer> consumer( session->createConsumer( queue.get() ) );
+ std::auto_ptr<Message> msg( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( msg.get() != NULL );
+ msg.reset( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( msg.get() != NULL );
+ msg.reset( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( msg.get() != NULL );
+ msg->acknowledge();
+
+ // Reset the session->
+ session->close();
+ session.reset( connection->createSession( Session::INDIVIDUAL_ACKNOWLEDGE) );
+
+ // Attempt to Consume the message...
+ consumer.reset( session->createConsumer( queue.get() ) );
+ msg.reset( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( msg.get() != NULL );
+ CPPUNIT_ASSERT( msg1->getText() == dynamic_cast<TextMessage*>( msg.get() )->getText() );
+ msg.reset( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( msg.get() != NULL );
+ CPPUNIT_ASSERT( msg2->getText() == dynamic_cast<TextMessage*>( msg.get() )->getText() );
+ msg.reset( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( msg.get() == NULL );
+
+ session->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireIndividualAckTest::testUnAckedMessageAreNotConsumedOnSessionClose() {
+
+ Connection* connection = this->cmsProvider->getConnection();
+ connection->start();
+
+ std::auto_ptr<Session> session( connection->createSession( cms::Session::INDIVIDUAL_ACKNOWLEDGE ) );
+ std::auto_ptr<Destination> queue( session->createTemporaryQueue() );
+ std::auto_ptr<MessageProducer> producer( session->createProducer( queue.get() ) );
+
+ std::auto_ptr<TextMessage> msg1( session->createTextMessage("Hello") );
+ producer->send( msg1.get() );
+
+ // Consume the message...
+ std::auto_ptr<MessageConsumer> consumer( session->createConsumer( queue.get() ) );
+ std::auto_ptr<Message> msg( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( msg.get() != NULL );
+ // Don't ack the message.
+
+ // Reset the session-> This should cause the unacknowledged message to be re-delivered.
+ session->close();
+ session.reset( connection->createSession( Session::INDIVIDUAL_ACKNOWLEDGE ) );
+
+ // Attempt to Consume the message...
+ consumer.reset( session->createConsumer( queue.get() ) );
+ msg.reset( consumer->receive( 2000 ) );
+ CPPUNIT_ASSERT( msg.get() != NULL );
+ msg->acknowledge();
+
+ session->close();
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h?rev=835612&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h Thu Nov 12 22:34:23 2009
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIREINDIVIDUALACKTEST_H_
+#define _ACTIVEMQ_TEST_OPENWIRE_OPENWIREINDIVIDUALACKTEST_H_
+
+#include <activemq/test/CMSTestFixture.h>
+#include <activemq/util/IntegrationCommon.h>
+
+namespace activemq {
+namespace test {
+namespace openwire {
+
+ class OpenwireIndividualAckTest : public CMSTestFixture {
+
+ CPPUNIT_TEST_SUITE( OpenwireIndividualAckTest );
+ CPPUNIT_TEST( testAckedMessageAreConsumed );
+ CPPUNIT_TEST( testLastMessageAcked );
+ CPPUNIT_TEST( testUnAckedMessageAreNotConsumedOnSessionClose );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ OpenwireIndividualAckTest();
+ virtual ~OpenwireIndividualAckTest();
+
+ void testAckedMessageAreConsumed();
+ void testLastMessageAcked();
+ void testUnAckedMessageAreNotConsumedOnSessionClose();
+
+ virtual std::string getBrokerURL() const {
+ return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();
+ }
+
+ };
+
+}}}
+
+#endif /* _ACTIVEMQ_TEST_OPENWIRE_OPENWIREINDIVIDUALACKTEST_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQMessageTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQMessageTest.cpp?rev=835612&r1=835611&r2=835612&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQMessageTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQMessageTest.cpp Thu Nov 12 22:34:23 2009
@@ -22,20 +22,21 @@
using namespace std;
using namespace activemq;
using namespace activemq::util;
+using namespace activemq::core;
using namespace activemq::commands;
////////////////////////////////////////////////////////////////////////////////
void ActiveMQMessageTest::test()
{
ActiveMQMessage myMessage;
- MyAckHandler ackHandler;
+ Pointer<MyAckHandler> ackHandler( new MyAckHandler() );
CPPUNIT_ASSERT( myMessage.getDataStructureType() == ActiveMQMessage::ID_ACTIVEMQMESSAGE );
- myMessage.setAckHandler( &ackHandler );
+ myMessage.setAckHandler( ackHandler );
myMessage.acknowledge();
- CPPUNIT_ASSERT( ackHandler.wasAcked == true );
+ CPPUNIT_ASSERT( ackHandler->wasAcked == true );
CPPUNIT_ASSERT( myMessage.getPropertyNames().size() == 0 );
CPPUNIT_ASSERT( myMessage.propertyExists( "something" ) == false );