You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2007/03/11 21:30:34 UTC
svn commit: r517008 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector:
openwire/OpenWireConnector.cpp openwire/OpenWireConnector.h
stomp/StompSessionManager.cpp
Author: nmittler
Date: Sun Mar 11 13:30:33 2007
New Revision: 517008
URL: http://svn.apache.org/viewvc?view=rev&rev=517008
Log:
[AMQCPP-30] Cleaning up improper usage of uri parameters
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=517008&r1=517007&r2=517008
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp Sun Mar 11 13:30:33 2007
@@ -384,6 +384,11 @@
consumerInfo->setSelector( selector );
consumerInfo->setNoLocal( noLocal );
+
+ /**
+ * Override default options with uri-encoded parameters.
+ */
+ applyDestinationOptions( consumerInfo );
// Send the message to the broker.
Response* response = syncRequest(consumerInfo);
@@ -441,6 +446,11 @@
consumerInfo->setSelector( selector );
consumerInfo->setNoLocal( noLocal );
consumerInfo->setSubscriptionName( name );
+
+ /**
+ * Override default options with uri-encoded parameters.
+ */
+ applyDestinationOptions( consumerInfo );
// Send the message to the broker.
Response* response = syncRequest(consumerInfo);
@@ -467,6 +477,138 @@
}
////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::applyDestinationOptions( commands::ConsumerInfo* info )
+{
+ const commands::ActiveMQDestination* amqDestination = info->getDestination();
+
+ // Get any options specified in the destination and apply them to the
+ // ConsumerInfo object.
+ const Properties& options = amqDestination->getOptions();
+
+ std::string noLocalStr =
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONSUMER_NOLOCAL );
+ if( options.hasProperty( noLocalStr ) )
+ {
+ info->setNoLocal(
+ Boolean::parseBoolean(
+ options.getProperty( noLocalStr ) ) );
+ }
+
+ std::string selectorStr =
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONSUMER_SELECTOR );
+
+ if( options.hasProperty( selectorStr ) )
+ {
+ info->setSelector(
+ options.getProperty( selectorStr ) );
+ }
+
+ std::string priorityStr =
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONSUMER_PRIORITY );
+
+ if( options.hasProperty( priorityStr ) )
+ {
+ info->setPriority(
+ Integer::parseInt(
+ options.getProperty( priorityStr ) ) );
+ }
+
+ std::string dispatchAsyncStr =
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONSUMER_DISPATCHASYNC );
+
+ if( options.hasProperty( dispatchAsyncStr ) )
+ {
+ info->setDispatchAsync(
+ Boolean::parseBoolean(
+ options.getProperty( dispatchAsyncStr ) ) );
+ }
+
+ std::string exclusiveStr =
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONSUMER_EXCLUSIVE );
+
+ if( options.hasProperty( exclusiveStr ) )
+ {
+ info->setExclusive(
+ Boolean::parseBoolean(
+ options.getProperty( exclusiveStr ) ) );
+ }
+
+ std::string maxPendingMsgLimitStr =
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT );
+
+ if( options.hasProperty( maxPendingMsgLimitStr ) )
+ {
+ info->setMaximumPendingMessageLimit(
+ Integer::parseInt(
+ options.getProperty( maxPendingMsgLimitStr ) ) );
+ }
+
+ std::string prefetchSizeStr =
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONSUMER_PREFECTCHSIZE );
+
+ if( info->getPrefetchSize() <= 0 || options.hasProperty( prefetchSizeStr ) )
+ {
+ info->setPrefetchSize(
+ Integer::parseInt(
+ options.getProperty( prefetchSizeStr, "1000" ) ) );
+ }
+
+ std::string retroactiveStr =
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONSUMER_RETROACTIVE );
+
+ if( options.hasProperty( retroactiveStr ) )
+ {
+ info->setRetroactive(
+ Boolean::parseBoolean(
+ options.getProperty( retroactiveStr ) ) );
+ }
+
+ std::string browserStr = "consumer.browser";
+
+ if( options.hasProperty( browserStr ) )
+ {
+ info->setBrowser(
+ Boolean::parseBoolean(
+ options.getProperty( browserStr ) ) );
+ }
+
+ std::string networkSubscriptionStr = "consumer.networkSubscription";
+
+ if( options.hasProperty( networkSubscriptionStr ) )
+ {
+ info->setNetworkSubscription(
+ Boolean::parseBoolean(
+ options.getProperty( networkSubscriptionStr ) ) );
+ }
+
+ std::string optimizedAcknowledgeStr = "consumer.optimizedAcknowledge";
+
+ if( options.hasProperty( optimizedAcknowledgeStr ) )
+ {
+ info->setOptimizedAcknowledge(
+ Boolean::parseBoolean(
+ options.getProperty( optimizedAcknowledgeStr ) ) );
+ }
+
+ std::string noRangeAcksStr = "consumer.noRangeAcks";
+
+ if( options.hasProperty( noRangeAcksStr ) )
+ {
+ info->setNoRangeAcks(
+ Boolean::parseBoolean(
+ options.getProperty( noRangeAcksStr ) ) );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
commands::ConsumerInfo* OpenWireConnector::createConsumerInfo(
const cms::Destination* destination,
connector::SessionInfo* session )
@@ -482,7 +624,7 @@
consumerId->setConnectionId( session->getConnectionId() );
consumerId->setSessionId( session->getSessionId() );
- consumerId->setValue( getNextConsumerId() );
+ consumerId->setValue( getNextConsumerId() );
// Cast the destination to an OpenWire destination, so we can
// get all the goodies.
@@ -497,37 +639,6 @@
consumerInfo->setDestination(
dynamic_cast<commands::ActiveMQDestination*>(
amqDestination->cloneDataStructure()) );
-
- // Get any options specified in the destination and apply them to the
- // ConsumerInfo object.
- const Properties& options = amqDestination->getOptions();
- consumerInfo->setBrowser( Boolean::parseBoolean(
- options.getProperty( "consumer.browser", "false" )) );
- consumerInfo->setPrefetchSize( Integer::parseInt(
- options.getProperty( "consumer.prefetchSize", "1000" )) );
- consumerInfo->setMaximumPendingMessageLimit( Integer::parseInt(
- options.getProperty( "consumer.maximumPendingMessageLimit", "0" )) );
- consumerInfo->setDispatchAsync( Boolean::parseBoolean(
- options.getProperty( "consumer.dispatchAsync", "false" )) );
- consumerInfo->setExclusive( Boolean::parseBoolean(
- options.getProperty( "consumer.exclusive", "false" )) );
- consumerInfo->setRetroactive( Boolean::parseBoolean(
- options.getProperty( "consumer.retroactive", "false" )) );
- consumerInfo->setPriority( Integer::parseInt(
- options.getProperty( "consumer.priority", "0" )) );
- consumerInfo->setNetworkSubscription( Boolean::parseBoolean(
- options.getProperty( "consumer.networkSubscription", "false" )) );
- consumerInfo->setOptimizedAcknowledge( Boolean::parseBoolean(
- options.getProperty( "consumer.optimizedAcknowledge", "false" )) );
- consumerInfo->setNoRangeAcks( Boolean::parseBoolean(
- options.getProperty( "consumer.noRangeAcks", "false" )) );
-
- // Send the message to the broker.
- Response* response = syncRequest(consumerInfo);
-
- // The broker did not return an error - this is good.
- // Just discard the response.
- delete response;
return consumerInfo;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h?view=diff&rev=517008&r1=517007&r2=517008
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h Sun Mar 11 13:30:33 2007
@@ -658,6 +658,11 @@
commands::TransactionId* createLocalTransactionId()
throw ( ConnectorException );
+ /**
+ * Applies the destination options to the given consumer.
+ */
+ void applyDestinationOptions( commands::ConsumerInfo* info );
+
};
}}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp?view=diff&rev=517008&r1=517007&r2=517008
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp Sun Mar 11 13:30:33 2007
@@ -369,7 +369,7 @@
ActiveMQConstants::toString(
ActiveMQConstants::CONSUMER_NOLOCAL );
- if( destProperties.getProperty( noLocalStr, "false" ) == "true" )
+ if( destProperties.hasProperty( noLocalStr ) )
{
command.setNoLocal(
Boolean::parseBoolean(