You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2008/12/05 21:41:26 UTC
svn commit: r723859 - in /activemq/activemq-cpp/trunk/src:
main/activemq/connector/ main/activemq/connector/openwire/
main/activemq/connector/stomp/ main/activemq/core/
test-integration/activemq/test/openwire/
Author: tabish
Date: Fri Dec 5 12:41:25 2008
New Revision: 723859
URL: http://svn.apache.org/viewvc?rev=723859&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-169
Added destination remove method to the ActiveMQConnection class that calls into the connectors to attempt a remove. Additional methods can be added in the Connection class as needed for other Broker specific managment actions for the time being.
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h Fri Dec 5 12:41:25 2008
@@ -53,18 +53,22 @@
protected:
// Flags the state we are in for connection to broker.
- enum ConnectionState
- {
+ enum ConnectionState {
CONNECTION_STATE_DISCONNECTED,
CONNECTION_STATE_ERROR,
CONNECTION_STATE_CONNECTING,
CONNECTION_STATE_CONNECTED
};
+ // Flags to be applied when sending the Destination Info Command.
+ enum DestinationActions {
+ DESTINATION_ADD_OPERATION = 0,
+ DESTINATION_REMOVE_OPERATION = 1
+ };
+
public: // Connector Types
- enum AckType
- {
+ enum AckType {
ACK_TYPE_DELIVERED = 0, // Message delivered but not consumed
ACK_TYPE_POISON = 1, // Message could not be processed due to
// poison pill but discard anyway
@@ -383,8 +387,9 @@
/**
* Pulls a message from the the service provider that this Connector is
* associated with. This could be because the service has a prefetch
- * policy that is set to zero and therefor requires each message to
+ * policy that is set to zero and therefore requires each message to
* be pulled from the server to the client via a poll.
+ *
* @param info - the consumer info for the consumer to pull for
* @param timeout - the time that the caller is going to wait for new messages
* @throw ConnectorException if a communications error occurs
@@ -393,6 +398,22 @@
virtual void pullMessage( const connector::ConsumerInfo* info, long long timeout )
throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) = 0;
+ /**
+ * Requests that the Broker remove a Destination, destroying all resources that
+ * have been associated with it. The Destination is removed and does not become
+ * valid again until a client creates a new Destination with that name again and
+ * sends a message that is bound to it.
+ *
+ * @param destination
+ * The Destination to Remove.
+ *
+ * @throw ConnectorException if a communications error occurs
+ *
+ * @throw UnsupportedOperationException if the connector can't pull
+ */
+ virtual void destroyDestination( const cms::Destination* destination )
+ throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) = 0;
+
};
}}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp Fri Dec 5 12:41:25 2008
@@ -1257,6 +1257,29 @@
}
////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::destroyDestination( const cms::Destination* destination )
+ throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) {
+
+ try {
+
+ const commands::ActiveMQDestination* amqDestination =
+ dynamic_cast<const commands::ActiveMQDestination*>( destination );
+
+ commands::DestinationInfo command;
+
+ command.setConnectionId( connectionInfo.getConnectionId()->cloneDataStructure() );
+ command.setOperationType( DESTINATION_REMOVE_OPERATION );
+ command.setDestination( amqDestination->cloneDataStructure() );
+
+ // Send the message to the broker.
+ syncRequest( &command );
+ }
+ AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
+ AMQ_CATCHALL_THROW( OpenWireConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
void OpenWireConnector::closeResource( ConnectorResource* resource )
throw ( ConnectorException ) {
@@ -1540,7 +1563,7 @@
commands::DestinationInfo command;
command.setConnectionId(
connectionInfo.getConnectionId()->cloneDataStructure() );
- command.setOperationType( 0 ); // 0 is add
+ command.setOperationType( DESTINATION_ADD_OPERATION );
command.setDestination( tempDestination->cloneDataStructure() );
// Send the message to the broker.
@@ -1563,7 +1586,7 @@
commands::DestinationInfo command;
command.setConnectionId(
connectionInfo.getConnectionId()->cloneDataStructure() );
- command.setOperationType( 1 ); // 1 is remove
+ command.setOperationType( DESTINATION_REMOVE_OPERATION );
command.setDestination(
tempDestination->cloneDataStructure() );
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h Fri Dec 5 12:41:25 2008
@@ -636,6 +636,22 @@
virtual void pullMessage( const connector::ConsumerInfo* info, long long timeout )
throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException );
+ /**
+ * Requests that the Broker remove a Destination, destroying all resources that
+ * have been associated with it. The Destination is removed and does not become
+ * valid again until a client creates a new Destination with that name again and
+ * sends a message that is bound to it.
+ *
+ * @param destination
+ * The Destination to Remove.
+ *
+ * @throw ConnectorException if a communications error occurs
+ *
+ * @throw UnsupportedOperationException if the connector can't pull
+ */
+ virtual void destroyDestination( const cms::Destination* destination )
+ throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException );
+
public: // transport::CommandListener
/**
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp Fri Dec 5 12:41:25 2008
@@ -752,6 +752,21 @@
}
////////////////////////////////////////////////////////////////////////////////
+void StompConnector::destroyDestination( const cms::Destination* destination )
+ throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) {
+
+ try {
+ throw UnsupportedOperationException(
+ __FILE__, __LINE__,
+ "StompConnector::destroyDestination - No Stomp Support for Destroying Destinations");
+ }
+ AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCH_RETHROW( UnsupportedOperationException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
+ AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
void StompConnector::closeResource( ConnectorResource* resource )
throw ( ConnectorException ) {
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h Fri Dec 5 12:41:25 2008
@@ -105,12 +105,12 @@
StompSessionManager* sessionManager;
/**
- * Next avaliable Producer Id
+ * Next available Producer Id
*/
util::LongSequenceGenerator producerIds;
/**
- * Next avaliable Transaction Id
+ * Next available Transaction Id
*/
util::LongSequenceGenerator transactionIds;
@@ -224,7 +224,7 @@
/**
* Creates a Session Info object for this connector
- * @param ackMode Acknowledgement Mode of the Session
+ * @param ackMode Acknowledgment Mode of the Session
* @returns Session Info Object
* @throws ConnectorException
*/
@@ -486,7 +486,7 @@
}
/**
- * Sets the Listner of exceptions for this connector
+ * Sets the Listener of exceptions for this connector
* @param listener ExceptionListener the observer.
*/
virtual void setExceptionListener(
@@ -507,7 +507,7 @@
/**
* Pulls a message from the the service provider that this Connector is
* associated with. This could be because the service has a prefetch
- * policy that is set to zero and therefor requires each message to
+ * policy that is set to zero and therefore requires each message to
* be pulled from the server to the client via a poll.
* @param info - the consumer info for the consumer to pull for
* @param timeout - the time that the caller is going to wait for new messages
@@ -517,6 +517,22 @@
virtual void pullMessage( const connector::ConsumerInfo* info, long long timeout )
throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException );
+ /**
+ * Requests that the Broker remove a Destination, destroying all resources that
+ * have been associated with it. The Destination is removed and does not become
+ * valid again until a client creates a new Destination with that name again and
+ * sends a message that is bound to it.
+ *
+ * @param destination
+ * The Destination to Remove.
+ *
+ * @throw ConnectorException if a communications error occurs
+ *
+ * @throw UnsupportedOperationException if the connector can't pull
+ */
+ virtual void destroyDestination( const cms::Destination* destination )
+ throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException );
+
public: // transport::CommandListener
/**
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp Fri Dec 5 12:41:25 2008
@@ -20,19 +20,18 @@
#include <cms/Session.h>
#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/ActiveMQConsumer.h>
-#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/Boolean.h>
#include <decaf/util/Iterator.h>
+using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::core;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
using namespace decaf::util;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
-using namespace activemq::connector;
-using namespace activemq::exceptions;
-using namespace std;
////////////////////////////////////////////////////////////////////////////////
ActiveMQConnection::ActiveMQConnection(ActiveMQConnectionData* connectionData) {
@@ -292,3 +291,32 @@
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::destroyDestination( cms::Destination* destination )
+ throw( decaf::lang::exceptions::NullPointerException,
+ decaf::lang::exceptions::IllegalStateException,
+ decaf::lang::exceptions::UnsupportedOperationException,
+ activemq::exceptions::ActiveMQException ) {
+
+ try{
+
+ if( destination == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__, "Destination passed was NULL" );
+ }
+
+ if( this->isClosed() ) {
+ throw IllegalStateException(
+ __FILE__, __LINE__, "Connection Closed" );
+ }
+
+ // Ask the connector to perform a remove.
+ this->connectionData->getConnector()->destroyDestination( destination );
+ }
+ AMQ_CATCH_RETHROW( NullPointerException )
+ AMQ_CATCH_RETHROW( IllegalStateException )
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h Fri Dec 5 12:41:25 2008
@@ -20,6 +20,7 @@
#include <cms/Connection.h>
#include <cms/ExceptionListener.h>
+#include <activemq/util/Config.h>
#include <activemq/core/ActiveMQConnectionData.h>
#include <activemq/core/ActiveMQMessage.h>
#include <activemq/core/Dispatcher.h>
@@ -28,7 +29,9 @@
#include <decaf/util/Properties.h>
#include <decaf/util/Map.h>
#include <decaf/util/Set.h>
-#include <activemq/util/Config.h>
+#include <decaf/lang/exceptions/UnsupportedOperationException.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
+#include <decaf/lang/exceptions/IllegalStateException.h>
#include <string>
@@ -126,6 +129,38 @@
virtual void sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout )
throw ( exceptions::ActiveMQException );
+ /**
+ * Checks if this connection has been closed
+ * @return true if the connection is closed
+ */
+ bool isClosed() const {
+ return this->closed;
+ }
+
+ /**
+ * Requests that the Broker removes the given Destination. Calling this
+ * method implies that the client is finished with the Destination and that
+ * no other messages will be sent or received for the given Destination. The
+ * Broker frees all resources it has associated with this Destination.
+ *
+ * @param destination
+ * The Destination the Broker will be requested to remove.
+ *
+ * @throws NullPointerException
+ * If the passed Destination is Null
+ * @throws IllegalStateException
+ * If the connection is closed.
+ * @throws UnsupportedOperationException
+ * If the wire format in use does not support this operation.
+ * @throws ActiveMQException
+ * If any other error occurs during the attempt to destroy the destination.
+ */
+ virtual void destroyDestination( cms::Destination* destination )
+ throw( decaf::lang::exceptions::NullPointerException,
+ decaf::lang::exceptions::IllegalStateException,
+ decaf::lang::exceptions::UnsupportedOperationException,
+ activemq::exceptions::ActiveMQException );
+
public: // Connection Interface Methods
/**
Modified: activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp Fri Dec 5 12:41:25 2008
@@ -18,6 +18,7 @@
#include "OpenwireSimpleTest.h"
#include <activemq/util/CMSListener.h>
+#include <activemq/core/ActiveMQConnection.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <decaf/util/UUID.h>
@@ -25,6 +26,7 @@
using namespace std;
using namespace cms;
using namespace activemq;
+using namespace activemq::core;
using namespace activemq::test;
using namespace activemq::test::openwire;
using namespace activemq::util;
@@ -202,3 +204,46 @@
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireSimpleTest::testDestroyDestination() {
+
+ try {
+
+ cmsProvider->setDestinationName( "testDestroyDestination" );
+ cmsProvider->reconnectSession();
+
+ // Create CMS Object for Comms
+ cms::Session* session( cmsProvider->getSession() );
+ cms::MessageConsumer* consumer = cmsProvider->getConsumer();
+ cms::MessageProducer* producer = cmsProvider->getProducer();
+ producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
+
+ auto_ptr<cms::TextMessage> txtMessage( session->createTextMessage( "TEST MESSAGE" ) );
+
+ // Send some text messages
+ producer->send( txtMessage.get() );
+
+ auto_ptr<cms::Message> message( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( message.get() != NULL );
+
+ ActiveMQConnection* connection =
+ dynamic_cast<ActiveMQConnection*>( cmsProvider->getConnection() );
+
+ CPPUNIT_ASSERT( connection != NULL );
+
+ try{
+ connection->destroyDestination( cmsProvider->getDestination() );
+ CPPUNIT_ASSERT_MESSAGE( "Destination Should be in use.", false );
+ } catch( ActiveMQException& ex ) {
+ }
+
+ cmsProvider->reconnectSession();
+
+ connection->destroyDestination( cmsProvider->getDestination() );
+
+ } catch( ActiveMQException& ex ) {
+ ex.printStackTrace();
+ CPPUNIT_ASSERT_MESSAGE( "CAUGHT EXCEPTION", false );
+ } AMQ_CATCHALL_THROW( ActiveMQException )
+}
Modified: activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h Fri Dec 5 12:41:25 2008
@@ -39,6 +39,7 @@
CPPUNIT_TEST( testWithZeroConsumerPrefetch );
CPPUNIT_TEST( testMapMessageSendToQueue );
CPPUNIT_TEST( testMapMessageSendToTopic );
+ CPPUNIT_TEST( testDestroyDestination );
CPPUNIT_TEST_SUITE_END();
public:
@@ -53,6 +54,7 @@
virtual void testWithZeroConsumerPrefetch();
virtual void testMapMessageSendToQueue();
virtual void testMapMessageSendToTopic();
+ virtual void testDestroyDestination();
};