You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/04/25 17:51:12 UTC

svn commit: r1330362 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ./ kernels/

Author: tabish
Date: Wed Apr 25 15:51:11 2012
New Revision: 1330362

URL: http://svn.apache.org/viewvc?rev=1330362&view=rev
Log:
Adds handling of the consumer control command.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1330362&r1=1330361&r2=1330362&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Wed Apr 25 15:51:11 2012
@@ -316,7 +316,7 @@ ActiveMQConnection::ActiveMQConnection(c
     transport->setTransportListener(this);
 
     // Set the initial state of the ConnectionInfo
-    configuration->connectionInfo->setManageable(false);
+    configuration->connectionInfo->setManageable(true);
     configuration->connectionInfo->setFaultTolerant(transport->isFaultTolerant());
 
     // Store of the transport and properties, the Connection now owns them.
@@ -797,6 +797,7 @@ void ActiveMQConnection::onCommand(const
                         message->setReadOnlyBody(true);
                         message->setReadOnlyProperties(true);
                         message->setRedeliveryCounter(dispatch->getRedeliveryCounter());
+                        message->setConnection(this);
                     }
 
                     dispatcher->dispatch(dispatch);
@@ -821,6 +822,20 @@ void ActiveMQConnection::onCommand(const
         } else if (command->isBrokerInfo()) {
             this->config->brokerInfo = command.dynamicCast<BrokerInfo>();
             this->config->brokerInfoReceived->countDown();
+        } else if (command->isConnectionControl()) {
+            this->onConnectionControl(command);
+        } else if (command->isControlCommand()) {
+            this->onControlCommand(command);
+        } else if (command->isConnectionError()) {
+
+            Pointer<ConnectionError> connectionError = command.dynamicCast<ConnectionError>();
+            Pointer<BrokerError> brokerError = connectionError->getException();
+            if (brokerError != NULL) {
+                this->onAsyncException(brokerError->createExceptionObject());
+            }
+
+        } else if (command->isConsumerControl()) {
+            this->onConsumerControl(command);
         }
 
         Pointer< Iterator<TransportListener*> > iter(this->config->transportListeners.iterator());
@@ -837,6 +852,34 @@ void ActiveMQConnection::onCommand(const
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::onControlCommand(Pointer<commands::Command> command AMQCPP_UNUSED) {
+    // Don't need to do anything yet as close and shutdown are applicable yet.
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::onConnectionControl(Pointer<commands::Command> command AMQCPP_UNUSED) {
+    // Don't need to do anything yet as we don't support optimizeAcknowledge.
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::onConsumerControl(Pointer<commands::Command> command) {
+
+    Pointer<ConsumerControl> consumerControl = command.dynamicCast<ConsumerControl>();
+
+    // Get the complete list of active sessions.
+    std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() );
+
+    while (iter->hasNext()) {
+        Pointer<ActiveMQSessionKernel> session = iter->next();
+        if (consumerControl->isClose()) {
+            session->close(consumerControl->getConsumerId());
+        } else {
+            session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch());
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::onException( const decaf::lang::Exception& ex ) {
 
     try {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1330362&r1=1330361&r2=1330362&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Wed Apr 25 15:51:11 2012
@@ -719,6 +719,15 @@ namespace core{
         // Allow subclasses to access the original Properties object for this connection.
         const decaf::util::Properties& getProperties() const;
 
+        // Process the ControlCommand command
+        void onControlCommand(Pointer<commands::Command> command);
+
+        // Process the ConnectionControl command
+        void onConnectionControl(Pointer<commands::Command> command);
+
+        // Process the ConsumerControl command
+        void onConsumerControl(Pointer<commands::Command> command);
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1330362&r1=1330361&r2=1330362&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Wed Apr 25 15:51:11 2012
@@ -1348,3 +1348,9 @@ decaf::lang::Exception* ActiveMQConsumer
 
     return this->internal->failureError.get();
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setPrefetchSize(int prefetchSize) {
+    deliverAcks();
+    this->consumerInfo->setCurrentPrefetchSize(prefetchSize);
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h?rev=1330362&r1=1330361&r2=1330362&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h Wed Apr 25 15:51:11 2012
@@ -256,6 +256,12 @@ namespace kernels {
          */
         decaf::lang::Exception* getFailureError() const;
 
+        /**
+         * Sets the current prefetch size for the consumer as indicated by a Broker
+         * ConsumerControl command.
+         */
+        void setPrefetchSize(int prefetchSize);
+
     protected:
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1330362&r1=1330361&r2=1330362&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Wed Apr 25 15:51:11 2012
@@ -1135,6 +1135,32 @@ void ActiveMQSessionKernel::removeProduc
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::setPrefetchSize(Pointer<ConsumerId> id, int prefetch) {
+
+    synchronized(&this->consumers) {
+        if (this->consumers.containsKey(id)) {
+            Pointer<ActiveMQConsumerKernel> consumer = this->consumers.get(id);
+            consumer->setPrefetchSize(prefetch);
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::close(Pointer<ConsumerId> id) {
+
+    synchronized(&this->consumers) {
+        if (this->consumers.containsKey(id)) {
+            Pointer<ActiveMQConsumerKernel> consumer = this->consumers.get(id);
+
+            try {
+                consumer->close();
+            } catch (cms::CMSException& e) {
+            }
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::doStartTransaction() {
 
     if (this->isTransacted() && !this->transaction->isInXATransaction()) {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h?rev=1330362&r1=1330361&r2=1330362&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h Wed Apr 25 15:51:11 2012
@@ -478,6 +478,25 @@ namespace kernels {
          */
         void dispose();
 
+        /**
+         * Set the prefetch level for the given consumer if it exists in this Session to
+         * the value specified.
+         *
+         * @param id
+         *      The consumer Id to search for and set prefetch level.
+         * @param prefetch
+         *      The new prefetch value.
+         */
+        void setPrefetchSize(Pointer<commands::ConsumerId> id, int prefetch);
+
+        /**
+         * Close the specified consumer if present in this Session.
+         *
+         * @param id
+         *      The consumer Id to close.
+         */
+        void close(Pointer<commands::ConsumerId> id);
+
    private:
 
        /**