You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2008/02/03 16:22:16 UTC

svn commit: r617997 - in /activemq/activemq-cpp/trunk/src/main/activemq/connector: Connector.h openwire/OpenWireConnector.cpp openwire/OpenWireConnector.h stomp/StompConnector.cpp stomp/StompConnector.h

Author: tabish
Date: Sun Feb  3 07:22:15 2008
New Revision: 617997

URL: http://svn.apache.org/viewvc?rev=617997&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-161

Adding in support in the connector to issue a message pull command for the case where prefetch size is set to zero.

Stomp does not support this so an UnsupportedOperation is thrown is called, the caller is expected to call the isMessagePullSupported first to make sure it is safe.

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h?rev=617997&r1=617996&r2=617997&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h Sun Feb  3 07:22:15 2008
@@ -356,6 +356,27 @@
         virtual void setExceptionListener(
             cms::ExceptionListener* listener ) = 0;
 
+        /**
+         * Checks if this connector supports pull of a new mesage from the service
+         * provider, if so then the user can call pullMessage() on the Connector
+         * to try and get a new message added to the receive queue.
+         * @returns true if the caller can use pullMessage without an exception
+         */
+        virtual bool isMessagePullSupported() const = 0;
+
+        /**
+         * Pulls a message from the the service provider that this Connector is
+         * associated with. This could be because the service has a prefetch
+         * policy that is set to zero and therefor requires each message to
+         * be pulled from the server to the client via a poll.
+         * @param info - the consumer info for the consumer to pull for
+         * @param timeout - the time that the caller is going to wait for new messages
+         * @throw ConnectorException if a communications error occurs
+         * @throw UnsupportedOperationException if the connector can't pull
+         */
+        virtual void pullMessage( connector::ConsumerInfo* info, long long timeout )
+            throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) = 0;
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp?rev=617997&r1=617996&r2=617997&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp Sun Feb  3 07:22:15 2008
@@ -47,6 +47,7 @@
 #include <activemq/connector/openwire/commands/DestinationInfo.h>
 #include <activemq/connector/openwire/commands/ExceptionResponse.h>
 #include <activemq/connector/openwire/commands/Message.h>
+#include <activemq/connector/openwire/commands/MessagePull.h>
 #include <activemq/connector/openwire/commands/MessageAck.h>
 #include <activemq/connector/openwire/commands/MessageDispatch.h>
 #include <activemq/connector/openwire/commands/RemoveInfo.h>
@@ -1203,6 +1204,33 @@
         throw OpenWireConnectorException( __FILE__, __LINE__,
             "caught unknown exception" );
     }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::pullMessage( connector::ConsumerInfo* info, long long timeout )
+    throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) {
+
+    try {
+
+        OpenWireConsumerInfo* consumer =
+             dynamic_cast<OpenWireConsumerInfo*>( info );
+
+         if( consumer->getConsumerInfo()->getPrefetchSize() == 0 ) {
+
+             commands::MessagePull messagePull;
+             messagePull.setConsumerId(
+                 consumer->getConsumerInfo()->getConsumerId()->cloneDataStructure() );
+             messagePull.setDestination(
+                 consumer->getConsumerInfo()->getDestination()->cloneDataStructure() );
+             messagePull.setTimeout( timeout );
+
+             // TODO - This should be Async
+             this->oneway( &messagePull );
+         }
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h?rev=617997&r1=617996&r2=617997&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h Sun Feb  3 07:22:15 2008
@@ -558,6 +558,29 @@
             this->exceptionListener = listener;
         }
 
+        /**
+         * Checks if this connector supports pull of a new mesage from the service
+         * provider, if so then the user can call pullMessage() on the Connector
+         * to try and get a new message added to the receive queue.
+         * @returns true if the caller can use pullMessage without an exception
+         */
+        virtual bool isMessagePullSupported() const {
+            return true;
+        }
+
+        /**
+         * Pulls a message from the the service provider that this Connector is
+         * associated with. This could be because the service has a prefetch
+         * policy that is set to zero and therefor requires each message to
+         * be pulled from the server to the client via a poll.
+         * @param info - the consumer info for the consumer to pull for
+         * @param timeout - the time that the caller is going to wait for new messages
+         * @throw ConnectorException if a communications error occurs
+         * @throw UnsupportedOperationException if the connector can't pull
+         */
+        virtual void pullMessage( connector::ConsumerInfo* info, long long timeout )
+            throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException );
+
     public: // transport::CommandListener
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp?rev=617997&r1=617996&r2=617997&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp Sun Feb  3 07:22:15 2008
@@ -714,6 +714,21 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void StompConnector::pullMessage( connector::ConsumerInfo* info AMQCPP_UNUSED, long long timeout AMQCPP_UNUSED )
+    throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) {
+
+    try {
+        throw UnsupportedOperationException(
+            __FILE__, __LINE__,
+            "StompConnector::pullMessage - No Stomp Support for Message Pull");
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCH_RETHROW( UnsupportedOperationException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void StompConnector::closeResource( ConnectorResource* resource )
     throw ( ConnectorException ) {
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h?rev=617997&r1=617996&r2=617997&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h Sun Feb  3 07:22:15 2008
@@ -477,6 +477,29 @@
             this->exceptionListener = listener;
         }
 
+        /**
+         * Checks if this connector supports pull of a new mesage from the service
+         * provider, if so then the user can call pullMessage() on the Connector
+         * to try and get a new message added to the receive queue.
+         * @returns true if the caller can use pullMessage without an exception
+         */
+        virtual bool isMessagePullSupported() const {
+            return false;
+        }
+
+        /**
+         * Pulls a message from the the service provider that this Connector is
+         * associated with. This could be because the service has a prefetch
+         * policy that is set to zero and therefor requires each message to
+         * be pulled from the server to the client via a poll.
+         * @param info - the consumer info for the consumer to pull for
+         * @param timeout - the time that the caller is going to wait for new messages
+         * @throw ConnectorException if a communications error occurs
+         * @throw UnsupportedOperationException if the connector can't pull
+         */
+        virtual void pullMessage( connector::ConsumerInfo* info, long long timeout )
+            throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException );
+
     public: // transport::CommandListener
 
         /**