You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/04/25 17:51:12 UTC
svn commit: r1330362 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ./ kernels/
Author: tabish
Date: Wed Apr 25 15:51:11 2012
New Revision: 1330362
URL: http://svn.apache.org/viewvc?rev=1330362&view=rev
Log:
Adds handling of the consumer control command.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1330362&r1=1330361&r2=1330362&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Wed Apr 25 15:51:11 2012
@@ -316,7 +316,7 @@ ActiveMQConnection::ActiveMQConnection(c
transport->setTransportListener(this);
// Set the initial state of the ConnectionInfo
- configuration->connectionInfo->setManageable(false);
+ configuration->connectionInfo->setManageable(true);
configuration->connectionInfo->setFaultTolerant(transport->isFaultTolerant());
// Store of the transport and properties, the Connection now owns them.
@@ -797,6 +797,7 @@ void ActiveMQConnection::onCommand(const
message->setReadOnlyBody(true);
message->setReadOnlyProperties(true);
message->setRedeliveryCounter(dispatch->getRedeliveryCounter());
+ message->setConnection(this);
}
dispatcher->dispatch(dispatch);
@@ -821,6 +822,20 @@ void ActiveMQConnection::onCommand(const
} else if (command->isBrokerInfo()) {
this->config->brokerInfo = command.dynamicCast<BrokerInfo>();
this->config->brokerInfoReceived->countDown();
+ } else if (command->isConnectionControl()) {
+ this->onConnectionControl(command);
+ } else if (command->isControlCommand()) {
+ this->onControlCommand(command);
+ } else if (command->isConnectionError()) {
+
+ Pointer<ConnectionError> connectionError = command.dynamicCast<ConnectionError>();
+ Pointer<BrokerError> brokerError = connectionError->getException();
+ if (brokerError != NULL) {
+ this->onAsyncException(brokerError->createExceptionObject());
+ }
+
+ } else if (command->isConsumerControl()) {
+ this->onConsumerControl(command);
}
Pointer< Iterator<TransportListener*> > iter(this->config->transportListeners.iterator());
@@ -837,6 +852,34 @@ void ActiveMQConnection::onCommand(const
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::onControlCommand(Pointer<commands::Command> command AMQCPP_UNUSED) {
+ // Don't need to do anything yet as close and shutdown are applicable yet.
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::onConnectionControl(Pointer<commands::Command> command AMQCPP_UNUSED) {
+ // Don't need to do anything yet as we don't support optimizeAcknowledge.
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::onConsumerControl(Pointer<commands::Command> command) {
+
+ Pointer<ConsumerControl> consumerControl = command.dynamicCast<ConsumerControl>();
+
+ // Get the complete list of active sessions.
+ std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() );
+
+ while (iter->hasNext()) {
+ Pointer<ActiveMQSessionKernel> session = iter->next();
+ if (consumerControl->isClose()) {
+ session->close(consumerControl->getConsumerId());
+ } else {
+ session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch());
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::onException( const decaf::lang::Exception& ex ) {
try {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1330362&r1=1330361&r2=1330362&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Wed Apr 25 15:51:11 2012
@@ -719,6 +719,15 @@ namespace core{
// Allow subclasses to access the original Properties object for this connection.
const decaf::util::Properties& getProperties() const;
+ // Process the ControlCommand command
+ void onControlCommand(Pointer<commands::Command> command);
+
+ // Process the ConnectionControl command
+ void onConnectionControl(Pointer<commands::Command> command);
+
+ // Process the ConsumerControl command
+ void onConsumerControl(Pointer<commands::Command> command);
+
};
}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1330362&r1=1330361&r2=1330362&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Wed Apr 25 15:51:11 2012
@@ -1348,3 +1348,9 @@ decaf::lang::Exception* ActiveMQConsumer
return this->internal->failureError.get();
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setPrefetchSize(int prefetchSize) {
+ deliverAcks();
+ this->consumerInfo->setCurrentPrefetchSize(prefetchSize);
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h?rev=1330362&r1=1330361&r2=1330362&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h Wed Apr 25 15:51:11 2012
@@ -256,6 +256,12 @@ namespace kernels {
*/
decaf::lang::Exception* getFailureError() const;
+ /**
+ * Sets the current prefetch size for the consumer as indicated by a Broker
+ * ConsumerControl command.
+ */
+ void setPrefetchSize(int prefetchSize);
+
protected:
/**
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1330362&r1=1330361&r2=1330362&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Wed Apr 25 15:51:11 2012
@@ -1135,6 +1135,32 @@ void ActiveMQSessionKernel::removeProduc
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::setPrefetchSize(Pointer<ConsumerId> id, int prefetch) {
+
+ synchronized(&this->consumers) {
+ if (this->consumers.containsKey(id)) {
+ Pointer<ActiveMQConsumerKernel> consumer = this->consumers.get(id);
+ consumer->setPrefetchSize(prefetch);
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::close(Pointer<ConsumerId> id) {
+
+ synchronized(&this->consumers) {
+ if (this->consumers.containsKey(id)) {
+ Pointer<ActiveMQConsumerKernel> consumer = this->consumers.get(id);
+
+ try {
+ consumer->close();
+ } catch (cms::CMSException& e) {
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::doStartTransaction() {
if (this->isTransacted() && !this->transaction->isInXATransaction()) {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h?rev=1330362&r1=1330361&r2=1330362&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h Wed Apr 25 15:51:11 2012
@@ -478,6 +478,25 @@ namespace kernels {
*/
void dispose();
+ /**
+ * Set the prefetch level for the given consumer if it exists in this Session to
+ * the value specified.
+ *
+ * @param id
+ * The consumer Id to search for and set prefetch level.
+ * @param prefetch
+ * The new prefetch value.
+ */
+ void setPrefetchSize(Pointer<commands::ConsumerId> id, int prefetch);
+
+ /**
+ * Close the specified consumer if present in this Session.
+ *
+ * @param id
+ * The consumer Id to close.
+ */
+ void close(Pointer<commands::ConsumerId> id);
+
private:
/**