You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2008/12/05 21:41:26 UTC

svn commit: r723859 - in /activemq/activemq-cpp/trunk/src: main/activemq/connector/ main/activemq/connector/openwire/ main/activemq/connector/stomp/ main/activemq/core/ test-integration/activemq/test/openwire/

Author: tabish
Date: Fri Dec  5 12:41:25 2008
New Revision: 723859

URL: http://svn.apache.org/viewvc?rev=723859&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-169

Added destination remove method to the ActiveMQConnection class that calls into the connectors to attempt a remove.  Additional methods can be added in the Connection class as needed for other Broker specific managment actions for the time being.

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
    activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h Fri Dec  5 12:41:25 2008
@@ -53,18 +53,22 @@
     protected:
 
         // Flags the state we are in for connection to broker.
-        enum ConnectionState
-        {
+        enum ConnectionState {
             CONNECTION_STATE_DISCONNECTED,
             CONNECTION_STATE_ERROR,
             CONNECTION_STATE_CONNECTING,
             CONNECTION_STATE_CONNECTED
         };
 
+        // Flags to be applied when sending the Destination Info Command.
+        enum DestinationActions {
+            DESTINATION_ADD_OPERATION = 0,
+            DESTINATION_REMOVE_OPERATION = 1
+        };
+
     public:    // Connector Types
 
-        enum AckType
-        {
+        enum AckType {
             ACK_TYPE_DELIVERED = 0,  // Message delivered but not consumed
             ACK_TYPE_POISON    = 1,  // Message could not be processed due to
                                      // poison pill but discard anyway
@@ -383,8 +387,9 @@
         /**
          * Pulls a message from the the service provider that this Connector is
          * associated with. This could be because the service has a prefetch
-         * policy that is set to zero and therefor requires each message to
+         * policy that is set to zero and therefore requires each message to
          * be pulled from the server to the client via a poll.
+         *
          * @param info - the consumer info for the consumer to pull for
          * @param timeout - the time that the caller is going to wait for new messages
          * @throw ConnectorException if a communications error occurs
@@ -393,6 +398,22 @@
         virtual void pullMessage( const connector::ConsumerInfo* info, long long timeout )
             throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) = 0;
 
+        /**
+         * Requests that the Broker remove a Destination, destroying all resources that
+         * have been associated with it.  The Destination is removed and does not become
+         * valid again until a client creates a new Destination with that name again and
+         * sends a message that is bound to it.
+         *
+         * @param destination
+         *        The Destination to Remove.
+         *
+         * @throw ConnectorException if a communications error occurs
+         *
+         * @throw UnsupportedOperationException if the connector can't pull
+         */
+        virtual void destroyDestination( const cms::Destination* destination )
+            throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) = 0;
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp Fri Dec  5 12:41:25 2008
@@ -1257,6 +1257,29 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::destroyDestination( const cms::Destination* destination )
+    throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) {
+
+    try {
+
+        const commands::ActiveMQDestination* amqDestination =
+            dynamic_cast<const commands::ActiveMQDestination*>( destination );
+
+        commands::DestinationInfo command;
+
+        command.setConnectionId( connectionInfo.getConnectionId()->cloneDataStructure() );
+        command.setOperationType( DESTINATION_REMOVE_OPERATION );
+        command.setDestination( amqDestination->cloneDataStructure() );
+
+        // Send the message to the broker.
+        syncRequest( &command );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void OpenWireConnector::closeResource( ConnectorResource* resource )
     throw ( ConnectorException ) {
 
@@ -1540,7 +1563,7 @@
         commands::DestinationInfo command;
         command.setConnectionId(
             connectionInfo.getConnectionId()->cloneDataStructure() );
-        command.setOperationType( 0 ); // 0 is add
+        command.setOperationType( DESTINATION_ADD_OPERATION );
         command.setDestination( tempDestination->cloneDataStructure() );
 
         // Send the message to the broker.
@@ -1563,7 +1586,7 @@
         commands::DestinationInfo command;
         command.setConnectionId(
             connectionInfo.getConnectionId()->cloneDataStructure() );
-        command.setOperationType( 1 ); // 1 is remove
+        command.setOperationType( DESTINATION_REMOVE_OPERATION );
         command.setDestination(
             tempDestination->cloneDataStructure() );
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h Fri Dec  5 12:41:25 2008
@@ -636,6 +636,22 @@
         virtual void pullMessage( const connector::ConsumerInfo* info, long long timeout )
             throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException );
 
+        /**
+         * Requests that the Broker remove a Destination, destroying all resources that
+         * have been associated with it.  The Destination is removed and does not become
+         * valid again until a client creates a new Destination with that name again and
+         * sends a message that is bound to it.
+         *
+         * @param destination
+         *        The Destination to Remove.
+         *
+         * @throw ConnectorException if a communications error occurs
+         *
+         * @throw UnsupportedOperationException if the connector can't pull
+         */
+        virtual void destroyDestination( const cms::Destination* destination )
+            throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException );
+
     public: // transport::CommandListener
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp Fri Dec  5 12:41:25 2008
@@ -752,6 +752,21 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void StompConnector::destroyDestination( const cms::Destination* destination )
+    throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) {
+
+    try {
+        throw UnsupportedOperationException(
+            __FILE__, __LINE__,
+            "StompConnector::destroyDestination - No Stomp Support for Destroying Destinations");
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCH_RETHROW( UnsupportedOperationException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void StompConnector::closeResource( ConnectorResource* resource )
     throw ( ConnectorException ) {
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h Fri Dec  5 12:41:25 2008
@@ -105,12 +105,12 @@
         StompSessionManager* sessionManager;
 
         /**
-         * Next avaliable Producer Id
+         * Next available Producer Id
          */
         util::LongSequenceGenerator producerIds;
 
         /**
-         * Next avaliable Transaction Id
+         * Next available Transaction Id
          */
         util::LongSequenceGenerator transactionIds;
 
@@ -224,7 +224,7 @@
 
         /**
          * Creates a Session Info object for this connector
-         * @param ackMode Acknowledgement Mode of the Session
+         * @param ackMode Acknowledgment Mode of the Session
          * @returns Session Info Object
          * @throws ConnectorException
          */
@@ -486,7 +486,7 @@
         }
 
         /**
-         * Sets the Listner of exceptions for this connector
+         * Sets the Listener of exceptions for this connector
          * @param listener ExceptionListener the observer.
          */
         virtual void setExceptionListener(
@@ -507,7 +507,7 @@
         /**
          * Pulls a message from the the service provider that this Connector is
          * associated with. This could be because the service has a prefetch
-         * policy that is set to zero and therefor requires each message to
+         * policy that is set to zero and therefore requires each message to
          * be pulled from the server to the client via a poll.
          * @param info - the consumer info for the consumer to pull for
          * @param timeout - the time that the caller is going to wait for new messages
@@ -517,6 +517,22 @@
         virtual void pullMessage( const connector::ConsumerInfo* info, long long timeout )
             throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException );
 
+        /**
+         * Requests that the Broker remove a Destination, destroying all resources that
+         * have been associated with it.  The Destination is removed and does not become
+         * valid again until a client creates a new Destination with that name again and
+         * sends a message that is bound to it.
+         *
+         * @param destination
+         *        The Destination to Remove.
+         *
+         * @throw ConnectorException if a communications error occurs
+         *
+         * @throw UnsupportedOperationException if the connector can't pull
+         */
+        virtual void destroyDestination( const cms::Destination* destination )
+            throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException );
+
     public: // transport::CommandListener
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp Fri Dec  5 12:41:25 2008
@@ -20,19 +20,18 @@
 #include <cms/Session.h>
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/ActiveMQConsumer.h>
-#include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/lang/Boolean.h>
 #include <decaf/util/Iterator.h>
 
+using namespace std;
 using namespace cms;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
 using namespace decaf::util;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
-using namespace activemq::connector;
-using namespace activemq::exceptions;
-using namespace std;
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQConnection::ActiveMQConnection(ActiveMQConnectionData* connectionData) {
@@ -292,3 +291,32 @@
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::destroyDestination( cms::Destination* destination )
+    throw( decaf::lang::exceptions::NullPointerException,
+           decaf::lang::exceptions::IllegalStateException,
+           decaf::lang::exceptions::UnsupportedOperationException,
+           activemq::exceptions::ActiveMQException ) {
+
+    try{
+
+        if( destination == NULL ) {
+            throw NullPointerException(
+                __FILE__, __LINE__, "Destination passed was NULL" );
+        }
+
+        if( this->isClosed() ) {
+            throw IllegalStateException(
+                __FILE__, __LINE__, "Connection Closed" );
+        }
+
+        // Ask the connector to perform a remove.
+        this->connectionData->getConnector()->destroyDestination( destination );
+    }
+    AMQ_CATCH_RETHROW( NullPointerException )
+    AMQ_CATCH_RETHROW( IllegalStateException )
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h Fri Dec  5 12:41:25 2008
@@ -20,6 +20,7 @@
 
 #include <cms/Connection.h>
 #include <cms/ExceptionListener.h>
+#include <activemq/util/Config.h>
 #include <activemq/core/ActiveMQConnectionData.h>
 #include <activemq/core/ActiveMQMessage.h>
 #include <activemq/core/Dispatcher.h>
@@ -28,7 +29,9 @@
 #include <decaf/util/Properties.h>
 #include <decaf/util/Map.h>
 #include <decaf/util/Set.h>
-#include <activemq/util/Config.h>
+#include <decaf/lang/exceptions/UnsupportedOperationException.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
+#include <decaf/lang/exceptions/IllegalStateException.h>
 
 #include <string>
 
@@ -126,6 +129,38 @@
         virtual void sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout )
             throw ( exceptions::ActiveMQException );
 
+        /**
+         * Checks if this connection has been closed
+         * @return true if the connection is closed
+         */
+        bool isClosed() const {
+            return this->closed;
+        }
+
+        /**
+         * Requests that the Broker removes the given Destination.  Calling this
+         * method implies that the client is finished with the Destination and that
+         * no other messages will be sent or received for the given Destination.  The
+         * Broker frees all resources it has associated with this Destination.
+         *
+         * @param destination
+         *        The Destination the Broker will be requested to remove.
+         *
+         * @throws NullPointerException
+         *         If the passed Destination is Null
+         * @throws IllegalStateException
+         *         If the connection is closed.
+         * @throws UnsupportedOperationException
+         *         If the wire format in use does not support this operation.
+         * @throws ActiveMQException
+         *         If any other error occurs during the attempt to destroy the destination.
+         */
+        virtual void destroyDestination( cms::Destination* destination )
+            throw( decaf::lang::exceptions::NullPointerException,
+                   decaf::lang::exceptions::IllegalStateException,
+                   decaf::lang::exceptions::UnsupportedOperationException,
+                   activemq::exceptions::ActiveMQException );
+
     public:   // Connection Interface Methods
 
         /**

Modified: activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp Fri Dec  5 12:41:25 2008
@@ -18,6 +18,7 @@
 #include "OpenwireSimpleTest.h"
 
 #include <activemq/util/CMSListener.h>
+#include <activemq/core/ActiveMQConnection.h>
 #include <activemq/exceptions/ActiveMQException.h>
 
 #include <decaf/util/UUID.h>
@@ -25,6 +26,7 @@
 using namespace std;
 using namespace cms;
 using namespace activemq;
+using namespace activemq::core;
 using namespace activemq::test;
 using namespace activemq::test::openwire;
 using namespace activemq::util;
@@ -202,3 +204,46 @@
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireSimpleTest::testDestroyDestination() {
+
+    try {
+
+        cmsProvider->setDestinationName( "testDestroyDestination" );
+        cmsProvider->reconnectSession();
+
+        // Create CMS Object for Comms
+        cms::Session* session( cmsProvider->getSession() );
+        cms::MessageConsumer* consumer = cmsProvider->getConsumer();
+        cms::MessageProducer* producer = cmsProvider->getProducer();
+        producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
+
+        auto_ptr<cms::TextMessage> txtMessage( session->createTextMessage( "TEST MESSAGE" ) );
+
+        // Send some text messages
+        producer->send( txtMessage.get() );
+
+        auto_ptr<cms::Message> message( consumer->receive( 1000 ) );
+        CPPUNIT_ASSERT( message.get() != NULL );
+
+        ActiveMQConnection* connection =
+            dynamic_cast<ActiveMQConnection*>( cmsProvider->getConnection() );
+
+        CPPUNIT_ASSERT( connection != NULL );
+
+        try{
+            connection->destroyDestination( cmsProvider->getDestination() );
+            CPPUNIT_ASSERT_MESSAGE( "Destination Should be in use.", false );
+        } catch( ActiveMQException& ex ) {
+        }
+
+        cmsProvider->reconnectSession();
+
+        connection->destroyDestination( cmsProvider->getDestination() );
+
+    } catch( ActiveMQException& ex ) {
+        ex.printStackTrace();
+        CPPUNIT_ASSERT_MESSAGE( "CAUGHT EXCEPTION", false );
+    } AMQ_CATCHALL_THROW( ActiveMQException )
+}

Modified: activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h?rev=723859&r1=723858&r2=723859&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h Fri Dec  5 12:41:25 2008
@@ -39,6 +39,7 @@
         CPPUNIT_TEST( testWithZeroConsumerPrefetch );
         CPPUNIT_TEST( testMapMessageSendToQueue );
         CPPUNIT_TEST( testMapMessageSendToTopic );
+        CPPUNIT_TEST( testDestroyDestination );
         CPPUNIT_TEST_SUITE_END();
 
     public:
@@ -53,6 +54,7 @@
         virtual void testWithZeroConsumerPrefetch();
         virtual void testMapMessageSendToQueue();
         virtual void testMapMessageSendToTopic();
+        virtual void testDestroyDestination();
 
     };