You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2008/08/30 20:45:21 UTC
svn commit: r690547 - in /activemq/activemq-cpp/trunk/src: examples/
main/activemq/connector/openwire/ main/activemq/core/
Author: tabish
Date: Sat Aug 30 11:45:21 2008
New Revision: 690547
URL: http://svn.apache.org/viewvc?rev=690547&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-196
sends are sync or async based on the rules and settings described in the issue.
Currently no producer flow control for async sends.
Modified:
activemq/activemq-cpp/trunk/src/examples/main.cpp
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h
Modified: activemq/activemq-cpp/trunk/src/examples/main.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/examples/main.cpp?rev=690547&r1=690546&r2=690547&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/examples/main.cpp (original)
+++ activemq/activemq-cpp/trunk/src/examples/main.cpp Sat Aug 30 11:45:21 2008
@@ -371,12 +371,15 @@
// tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead
//
std::string brokerURI =
- "tcp://127.0.0.1:61616"
- "?wireFormat=openwire";
-// "&transport.useAsyncSend=true";
+ "tcp://127.1.0.1:61616"
+ "?wireFormat=openwire"
+// "&connection.alwaysSyncSend=true"
+// "&connection.useAsyncSend=true"
+// "&transport.useAsyncSend=true"
// "&transport.commandTracingEnabled=true"
-// "&transport.tcpTracingEnabled=true";
-// "&wireFormat.tightEncodingEnabled=true";
+// "&transport.tcpTracingEnabled=true"
+// "&wireFormat.tightEncodingEnabled=true"
+ ;
//============================================================
// set to true to use topics instead of queues
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp?rev=690547&r1=690546&r2=690547&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp Sat Aug 30 11:45:21 2008
@@ -88,6 +88,17 @@
this->state = CONNECTION_STATE_DISCONNECTED;
}
+ // Check the connection options
+ this->setAlwaysSyncSend( Boolean::parseBoolean(
+ properties.getProperty(
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONNECTION_ALWAYSSYNCSEND ), "false" ) ) );
+
+ this->setUseAsyncSend( Boolean::parseBoolean(
+ properties.getProperty(
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONNECTION_USEASYNCSEND ), "false" ) ) );
+
this->exceptionListener = NULL;
this->messageListener = NULL;
this->brokerInfo = NULL;
@@ -855,12 +866,24 @@
getTransactionId()->cloneDataStructure() );
}
- // Send the message to the broker.
- Response* response = syncRequest( amqMessage, producerInfo->getSendTimeout() );
+ if( producerInfo->getSendTimeout() <= 0 &&
+ !amqMessage->isResponseRequired() &&
+ !this->isAlwaysSyncSend() &&
+ ( !amqMessage->isPersistent() || this->isUseAsyncSend() ||
+ amqMessage->getTransactionId() != NULL ) ) {
- // The broker did not return an error - this is good.
- // Just discard the response.
- delete response;
+ // No Response Required.
+ this->oneway( amqMessage );
+
+ } else {
+
+ // Send the message to the broker.
+ Response* response = syncRequest( amqMessage, producerInfo->getSendTimeout() );
+
+ // The broker did not return an error - this is good.
+ // Just discard the response.
+ delete response;
+ }
} catch( ConnectorException& ex ){
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h?rev=690547&r1=690546&r2=690547&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h Sat Aug 30 11:45:21 2008
@@ -188,6 +188,21 @@
*/
decaf::util::Map< long long, OpenWireConsumerInfo* > consumerInfoMap;
+ /**
+ * Boolean indicating that we are to always send message Synchronously.
+ * This overrides the sending on non-persistent messages or transacted
+ * messages asynchronously, also fully overrides the useAsyncSend flag.
+ */
+ bool alwaysSyncSend;
+
+ /**
+ * Boolean indicating that we are to send any messages that we would normally
+ * send synchronously using an asynchronous send. Normally we send all the
+ * persistent messages not in a transaction synchronously and all others are
+ * sent asynchronously. Only applied though is alwaysSyncSend is false.
+ */
+ bool useAsyncSend;
+
private:
/**
@@ -619,6 +634,40 @@
transport::Transport* source,
const decaf::lang::Exception& ex );
+ public: // Local Getters and Setters
+
+ /**
+ * Gets if the alwaysSyncSend option is set
+ * @returns true if on false if not.
+ */
+ bool isAlwaysSyncSend() const {
+ return this->alwaysSyncSend;
+ }
+
+ /**
+ * Sets the alwaysSyncSend option
+ * @param value - true to activate, false to disable.
+ */
+ void setAlwaysSyncSend( bool value ) {
+ this->alwaysSyncSend = value;
+ }
+
+ /**
+ * Gets if the useAsyncSend option is set
+ * @returns true if on false if not.
+ */
+ bool isUseAsyncSend() const {
+ return this->useAsyncSend;
+ }
+
+ /**
+ * Sets the useAsyncSend option
+ * @param value - true to activate, false to disable.
+ */
+ void setUseAsyncSend( bool value ) {
+ this->useAsyncSend = value;
+ }
+
private:
// Gets the configured producer window size to use when creating new
@@ -627,7 +676,7 @@
return decaf::lang::Integer::parseInt(
properties.getProperty(
core::ActiveMQConstants::toString(
- core::ActiveMQConstants::PARAM_PRODUCERWINDOWSIZE ), "0" ) );
+ core::ActiveMQConstants::CONNECTION_PRODUCERWINDOWSIZE ), "0" ) );
}
// Gets the time to wait for a producer send to complete, meaning the time to
@@ -636,7 +685,7 @@
return decaf::lang::Integer::parseInt(
properties.getProperty(
core::ActiveMQConstants::toString(
- core::ActiveMQConstants::PARAM_SENDTIMEOUT ), "0" ) );
+ core::ActiveMQConstants::CONNECTION_SENDTIMEOUT ), "0" ) );
}
// Gets the time to wait for a response from the Broker when the close message
@@ -645,7 +694,7 @@
return decaf::lang::Integer::parseInt(
properties.getProperty(
core::ActiveMQConstants::toString(
- core::ActiveMQConstants::PARAM_CLOSETIMEOUT ), "15000" ) );
+ core::ActiveMQConstants::CONNECTION_CLOSETIMEOUT ), "15000" ) );
}
// Check for Connected State and Throw an exception if not.
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp?rev=690547&r1=690546&r2=690547&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp Sat Aug 30 11:45:21 2008
@@ -45,9 +45,11 @@
destOptions[CONSUMER_EXCLUSIVE] = "consumer.exclusive";
destOptions[CONSUMER_PRIORITY] = "consumer.priority";
- uriParams[PARAM_CLOSETIMEOUT] = "connection.closeTimeout";
- uriParams[PARAM_SENDTIMEOUT] = "connection.sendTimeout";
- uriParams[PARAM_PRODUCERWINDOWSIZE] = "connection.producerWidowSize";
+ uriParams[CONNECTION_CLOSETIMEOUT] = "connection.closeTimeout";
+ uriParams[CONNECTION_SENDTIMEOUT] = "connection.sendTimeout";
+ uriParams[CONNECTION_PRODUCERWINDOWSIZE] = "connection.producerWidowSize";
+ uriParams[CONNECTION_ALWAYSSYNCSEND] = "connection.alwaysSyncSend";
+ uriParams[CONNECTION_USEASYNCSEND] = "connection.useAsyncSend";
uriParams[PARAM_USERNAME] = "username";
uriParams[PARAM_PASSWORD] = "password";
uriParams[PARAM_CLIENTID] = "client-id";
@@ -58,4 +60,5 @@
for( int ix=0; ix<NUM_PARAMS; ++ix ){
uriParamsMap[uriParams[ix]] = (URIParam)ix;
}
+
}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h?rev=690547&r1=690546&r2=690547&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h Sat Aug 30 11:45:21 2008
@@ -54,12 +54,14 @@
*/
enum URIParam
{
- PARAM_SENDTIMEOUT,
- PARAM_PRODUCERWINDOWSIZE,
+ CONNECTION_SENDTIMEOUT,
+ CONNECTION_PRODUCERWINDOWSIZE,
+ CONNECTION_CLOSETIMEOUT,
+ CONNECTION_ALWAYSSYNCSEND,
+ CONNECTION_USEASYNCSEND,
PARAM_USERNAME,
PARAM_PASSWORD,
PARAM_CLIENTID,
- PARAM_CLOSETIMEOUT,
NUM_PARAMS
};