You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2008/11/17 16:56:47 UTC

svn commit: r718265 - in /activemq/activemq-cpp/trunk/src/main/activemq: connector/openwire/ connector/openwire/commands/ util/

Author: tabish
Date: Mon Nov 17 07:56:47 2008
New Revision: 718265

URL: http://svn.apache.org/viewvc?rev=718265&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-196

Complete the initial implementation of async send from the connector, deprecating the AsyncTransport.  Initial Memory usage tracking is in place but will probably need more work as users start using it.  

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/commands/Message.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/commands/Message.h
    activemq/activemq-cpp/trunk/src/main/activemq/util/MemoryUsage.cpp

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp?rev=718265&r1=718264&r2=718265&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp Mon Nov 17 07:56:47 2008
@@ -100,6 +100,21 @@
             core::ActiveMQConstants::toString(
                 core::ActiveMQConstants::CONNECTION_USEASYNCSEND ), "false" ) ) );
 
+    this->setProducerWindowSize( decaf::lang::Integer::parseInt(
+        properties.getProperty(
+            core::ActiveMQConstants::toString(
+                core::ActiveMQConstants::CONNECTION_PRODUCERWINDOWSIZE ), "0" ) ) );
+
+    this->setSendTimeout( decaf::lang::Integer::parseInt(
+        properties.getProperty(
+            core::ActiveMQConstants::toString(
+                core::ActiveMQConstants::CONNECTION_SENDTIMEOUT ), "0" ) ) );
+
+    this->setCloseTimeout( decaf::lang::Integer::parseInt(
+        properties.getProperty(
+            core::ActiveMQConstants::toString(
+                core::ActiveMQConstants::CONNECTION_CLOSETIMEOUT ), "15000" ) ) );
+
     this->exceptionListener = NULL;
     this->messageListener = NULL;
     this->brokerInfo = NULL;
@@ -624,6 +639,11 @@
         // Send the message to the broker.
         syncRequest( producer->getProducerInfo() );
 
+        synchronized( &this->producerInfoMap ) {
+            // Place the Producer into the Map.
+            producerInfoMap.setValue( producer->getProducerId(), producer.get() );
+        }
+
         return producer.release();
     }
     AMQ_CATCH_RETHROW( ConnectorException )
@@ -774,6 +794,10 @@
             ( !amqMessage->isPersistent() || this->isUseAsyncSend() ||
               amqMessage->getTransactionId() != NULL ) ) {
 
+            if( producer->isUsageTrackingEnabled() ) {
+                producer->enqueUsage( amqMessage->getSize() );
+            }
+
             // No Response Required.
             this->oneway( amqMessage );
 
@@ -1255,20 +1279,28 @@
                 dynamic_cast<OpenWireConsumerInfo*>( resource );
 
             // Remove this consumer from the consumer info map
-            synchronized( &consumerInfoMap ) {
-                consumerInfoMap.remove(
+            synchronized( &this->consumerInfoMap ) {
+                this->consumerInfoMap.remove(
                     consumer->getConsumerInfo()->getConsumerId()->getValue() );
             }
 
-            // Unstarted Consumers can just be deleted.
+            // Non-started Consumers can just be deleted.
             if( consumer->isStarted() == false ) {
                 return;
             }
 
             dataStructure = consumer->getConsumerInfo()->getConsumerId();
         } else if( typeid( *resource ) == typeid( OpenWireProducerInfo ) ) {
+
             OpenWireProducerInfo* producer =
                 dynamic_cast<OpenWireProducerInfo*>( resource );
+
+            // Remove this consumer from the consumer info map
+            synchronized( &this->producerInfoMap ) {
+                this->producerInfoMap.remove(
+                    producer->getProducerInfo()->getProducerId()->getValue() );
+            }
+
             dataStructure = producer->getProducerInfo()->getProducerId();
         } else if( typeid( *resource ) == typeid( OpenWireSessionInfo ) ) {
             OpenWireSessionInfo* session =
@@ -1310,10 +1342,11 @@
                 dynamic_cast<commands::MessageDispatch*>( command );
 
             // Due to the severe suckiness of C++, in order to cast to
-            // a type that is in a different branch of the inheritence hierarchy
+            // a type that is in a different branch of the inheritance hierarchy
             // we have to cast to the type at the "crotch" of the branch and then
             // we can implicitly cast up the other branch.
-            core::ActiveMQMessage* message = dynamic_cast<core::ActiveMQMessage*>(dispatch->getMessage());
+            core::ActiveMQMessage* message =
+                dynamic_cast<core::ActiveMQMessage*>( dispatch->getMessage() );
             if( message == NULL ) {
                 delete command;
                 throw OpenWireConnectorException(
@@ -1354,9 +1387,17 @@
 
         } else if( typeid( *command ) == typeid( commands::ProducerAck ) ) {
 
-            // TODO - Apply The Ack.
-            //commands::ProducerAck* producerAck =
-            //    dynamic_cast<commands::ProducerAck*>( command );
+            commands::ProducerAck* producerAck =
+                dynamic_cast<commands::ProducerAck*>( command );
+
+            // Get the consumer info object for this consumer.
+            OpenWireProducerInfo* info = NULL;
+            synchronized( &this->producerInfoMap ) {
+                info = producerInfoMap.getValue( producerAck->getProducerId()->getValue() );
+                if( info != NULL && info->isUsageTrackingEnabled() ){
+                    info->decreaseUsage( producerAck->getSize() );
+                }
+            }
 
             delete command;
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h?rev=718265&r1=718264&r2=718265&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h Mon Nov 17 07:56:47 2008
@@ -56,6 +56,7 @@
 #include <activemq/connector/openwire/OpenWireCommandWriter.h>
 #include <activemq/connector/openwire/OpenWireFormatNegotiator.h>
 #include <activemq/connector/openwire/OpenWireConsumerInfo.h>
+#include <activemq/connector/openwire/OpenWireProducerInfo.h>
 
 #include <activemq/connector/openwire/commands/ActiveMQTempDestination.h>
 #include <activemq/connector/openwire/commands/BrokerInfo.h>
@@ -96,8 +97,8 @@
         transport::Transport* transport;
 
         /**
-         * The OpenWireFormat class that controls Protocal versions and
-         * marshalling details.
+         * The OpenWireFormat class that controls Protocol versions and
+         * marshaling details.
          */
         OpenWireFormat* wireFormat;
 
@@ -148,22 +149,22 @@
         OpenWireCommandWriter writer;
 
         /**
-         * Next avaliable Producer Id
+         * Next available Producer Id
          */
         util::LongSequenceGenerator producerIds;
 
         /**
-         * Next avaliable Producer Sequence Id
+         * Next available Producer Sequence Id
          */
         util::LongSequenceGenerator producerSequenceIds;
 
         /**
-         * Next avaliable Consumer Id
+         * Next available Consumer Id
          */
         util::LongSequenceGenerator consumerIds;
 
         /**
-         * Next avaliable Transaction Id
+         * Next available Transaction Id
          */
         util::LongSequenceGenerator transactionIds;
 
@@ -183,12 +184,16 @@
         decaf::util::Properties properties;
 
         /**
-         * Mapping of consumer IDs to their respective
-         * consumer info object.
+         * Mapping of consumer IDs to their respective consumer info object.
          */
         decaf::util::Map< long long, OpenWireConsumerInfo* > consumerInfoMap;
 
         /**
+         * Mapping of producer IDs to their respective producer info object.
+         */
+        decaf::util::Map< long long, OpenWireProducerInfo* > producerInfoMap;
+
+        /**
          * Boolean indicating that we are to always send message Synchronously.
          * This overrides the sending on non-persistent messages or transacted
          * messages asynchronously, also fully overrides the useAsyncSend flag.
@@ -203,6 +208,23 @@
          */
         bool useAsyncSend;
 
+        /**
+         * Send Timeout, forces all messages to be sent Synchronously.
+         */
+        unsigned int sendTimeout;
+
+        /**
+         * Close Timeout, time to wait for a Closed message from the broker before
+         * giving up and just shutting down the connection.
+         */
+        unsigned int closeTimeout;
+
+        /**
+         * Producer Window Size, amount of memory that can be used before the producer
+         * blocks and waits for ProducerAck messages.
+         */
+        unsigned int producerWindowSize;
+
     private:
 
         /**
@@ -322,7 +344,7 @@
 
         /**
          * Creates a Session Info object for this connector
-         * @param Acknowledgement Mode of the Session
+         * @param Acknowledgment Mode of the Session
          * @returns Session Info Object
          * @throws ConnectorException
          */
@@ -582,7 +604,7 @@
         }
 
         /**
-         * Sets the Listner of exceptions for this connector
+         * Sets the Listener of exceptions for this connector
          * @param ExceptionListener the observer.
          */
         virtual void setExceptionListener(
@@ -604,7 +626,7 @@
         /**
          * Pulls a message from the the service provider that this Connector is
          * associated with. This could be because the service has a prefetch
-         * policy that is set to zero and therefor requires each message to
+         * policy that is set to zero and therefore requires each message to
          * be pulled from the server to the client via a poll.
          * @param info - the consumer info for the consumer to pull for
          * @param timeout - the time that the caller is going to wait for new messages
@@ -668,35 +690,62 @@
             this->useAsyncSend = value;
         }
 
-    private:
+        /**
+         * Gets the assigned send timeout for this Connector
+         * @return the send timeout configured in the connection uri
+         */
+        unsigned int getSendTimeout() const {
+            return this->sendTimeout;
+        }
+
+        /**
+         * Sets the send timeout to use when sending Message objects, this will
+         * cause all messages to be sent using a Synchronous request is non-zero.
+         * @param timeout - The time to wait for a response.
+         */
+        void setSendTimeout( unsigned int timeout ) {
+            this->sendTimeout = timeout;
+        }
+
+        /**
+         * Gets the assigned close timeout for this Connector
+         * @return the close timeout configured in the connection uri
+         */
+        unsigned int getCloseTimeout() const {
+            return this->closeTimeout;
+        }
+
+        /**
+         * Sets the close timeout to use when sending the disconnect request.
+         * @param timeout - The time to wait for a close message.
+         */
+        void setCloseTimeout( unsigned int timeout ) {
+            this->closeTimeout = timeout;
+        }
 
-        // Gets the configured producer window size to use when creating new
-        // producers to control how much memory is used.
-        virtual unsigned int getProducerWindowSize() const {
-            return decaf::lang::Integer::parseInt(
-                properties.getProperty(
-                    core::ActiveMQConstants::toString(
-                        core::ActiveMQConstants::CONNECTION_PRODUCERWINDOWSIZE ), "0" ) );
-        }
-
-        // Gets the time to wait for a producer send to complete, meaning the time to
-        // wait for a response.  Zero indicates to wait forever.
-        virtual unsigned int getSendTimeout() const {
-            return decaf::lang::Integer::parseInt(
-                properties.getProperty(
-                    core::ActiveMQConstants::toString(
-                        core::ActiveMQConstants::CONNECTION_SENDTIMEOUT ), "0" ) );
-        }
-
-        // Gets the time to wait for a response from the Broker when the close message
-        // is sent.
-        virtual unsigned int getCloseTimeout() const {
-            return decaf::lang::Integer::parseInt(
-                properties.getProperty(
-                    core::ActiveMQConstants::toString(
-                        core::ActiveMQConstants::CONNECTION_CLOSETIMEOUT ), "15000" ) );
+        /**
+         * Gets the configured producer window size for Producers that are created
+         * from this connector.  This only applies if there is no send timeout and the
+         * producer is able to send asynchronously.
+         * @return size in bytes of messages that this producer can produce before
+         *         it must block and wait for ProducerAck messages to free resources.
+         */
+        unsigned int getProducerWindowSize() const {
+            return this->producerWindowSize;
         }
 
+        /**
+         * Sets the size in Bytes of messages that a producer can send before it is blocked
+         * to await a ProducerAck from the broker that frees enough memory to allow another
+         * message to be sent.
+         * @param windowSize - The size in bytes of the Producers memory window.
+         */
+        void setProducerWindowSize( unsigned int windowSize ) {
+            this->producerWindowSize = windowSize;
+        }
+
+    private:
+
         // Check for Connected State and Throw an exception if not.
         void enforceConnected() throw ( ConnectorException );
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h?rev=718265&r1=718264&r2=718265&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h Mon Nov 17 07:56:47 2008
@@ -22,6 +22,8 @@
 #include <activemq/connector/ProducerInfo.h>
 #include <activemq/connector/openwire/commands/ProducerInfo.h>
 #include <cms/Destination.h>
+#include <activemq/util/MemoryUsage.h>
+#include <memory>
 
 namespace activemq{
 namespace connector{
@@ -42,9 +44,8 @@
         // Send timeout, how long to wait for a response before failing.
         unsigned int sendTimeout;
 
-        // Producer Window, number of messages to send before waiting for
-        // the broker to send ProducerAcks.  Openwire 3.0 only.
-        unsigned long long producerWindow;
+        // Memory Usage if we are using Producer Window Sizes
+        std::auto_ptr<activemq::util::MemoryUsage> usage;
 
     public:
 
@@ -55,7 +56,6 @@
             this->producerInfo = NULL;
             this->session = NULL;
             this->sendTimeout = 0;
-            this->producerWindow = 0;
         }
 
         virtual ~OpenWireProducerInfo() {
@@ -95,8 +95,7 @@
          */
         virtual long long getProducerId() const {
             if( this->producerInfo != NULL ) {
-                return (unsigned int)
-                    this->producerInfo->getProducerId()->getValue();
+                return (unsigned int)this->producerInfo->getProducerId()->getValue();
             }
 
             return 0;
@@ -147,6 +146,11 @@
          */
         virtual void setProducerInfo( commands::ProducerInfo* producerInfo ) {
             this->producerInfo = producerInfo;
+
+            if( this->producerInfo != NULL && this->producerInfo->getWindowSize() > 0 ) {
+                this->usage.reset( new activemq::util::MemoryUsage(
+                    this->producerInfo->getWindowSize() ) );
+            }
         }
 
         /**
@@ -187,21 +191,39 @@
         }
 
         /**
-         * Gets the currently Set Producer Window
-         * @return the set producer window.
+         * Queries if this Producer is tracking memory usage, returns true if so.
+         * @return true if this producer is tracking memory usage.
          */
-        virtual unsigned long long getProducerWindow() const {
-            return this->producerWindow;
+        virtual bool isUsageTrackingEnabled() const {
+            return this->usage.get() != NULL;
         }
 
         /**
-         * Sets the Producer Window, which is the max number of messages to send before
-         * timing waiting for acks from the broker. (Openwire 3.0 only).
-         * @param windowSize - The number of message to send before a block to wait for
-         * the receipt of a ProducerAck.
+         * Enqueues more usage on this producer's current Memory usage, if there
+         * is not enough space, then this call blocks until there is room for the
+         * requested space.  If Memory Usage tracking is not enabled then this method
+         * does nothing.
+         *
+         * @param usage - Size in bytes of the usage to enqueue
          */
-        virtual void setProducerWindow( unsigned long long window ) {
-            this->producerWindow = window;
+        virtual void enqueUsage( unsigned int usage ) {
+            if( this->usage.get() != NULL ) {
+                this->usage->enqueueUsage( usage );
+            }
+        }
+
+        /**
+         * Frees up a given amount of usage from the usage being tracked for this
+         * Producer.  If a previous call to enqueueUsage was blocked and enough space
+         * is freed for it to continue then it will be woken up.  If memory usage
+         * tracking is not enabled then this method has no effect.
+         *
+         * @param usage - Size in bytes of the usage to enqueue
+         */
+        virtual void decreaseUsage( unsigned int usage ) {
+            if( this->usage.get() != NULL ) {
+                this->usage->decreaseUsage( usage );
+            }
         }
 
     };

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/commands/Message.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/commands/Message.cpp?rev=718265&r1=718264&r2=718265&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/commands/Message.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/commands/Message.cpp Mon Nov 17 07:56:47 2008
@@ -105,39 +105,39 @@
     const Message* srcPtr = dynamic_cast<const Message*>( src );
 
     if( srcPtr == NULL || src == NULL ) {
-    
+
         throw decaf::lang::exceptions::NullPointerException(
             __FILE__, __LINE__,
             "Message::copyDataStructure - src is NULL or invalid" );
     }
     if( srcPtr->getProducerId() != NULL ) {
-        this->setProducerId( 
-            dynamic_cast<ProducerId*>( 
+        this->setProducerId(
+            dynamic_cast<ProducerId*>(
                 srcPtr->getProducerId()->cloneDataStructure() ) );
     }
     if( srcPtr->getDestination() != NULL ) {
-        this->setDestination( 
-            dynamic_cast<ActiveMQDestination*>( 
+        this->setDestination(
+            dynamic_cast<ActiveMQDestination*>(
                 srcPtr->getDestination()->cloneDataStructure() ) );
     }
     if( srcPtr->getTransactionId() != NULL ) {
-        this->setTransactionId( 
-            dynamic_cast<TransactionId*>( 
+        this->setTransactionId(
+            dynamic_cast<TransactionId*>(
                 srcPtr->getTransactionId()->cloneDataStructure() ) );
     }
     if( srcPtr->getOriginalDestination() != NULL ) {
-        this->setOriginalDestination( 
-            dynamic_cast<ActiveMQDestination*>( 
+        this->setOriginalDestination(
+            dynamic_cast<ActiveMQDestination*>(
                 srcPtr->getOriginalDestination()->cloneDataStructure() ) );
     }
     if( srcPtr->getMessageId() != NULL ) {
-        this->setMessageId( 
-            dynamic_cast<MessageId*>( 
+        this->setMessageId(
+            dynamic_cast<MessageId*>(
                 srcPtr->getMessageId()->cloneDataStructure() ) );
     }
     if( srcPtr->getOriginalTransactionId() != NULL ) {
-        this->setOriginalTransactionId( 
-            dynamic_cast<TransactionId*>( 
+        this->setOriginalTransactionId(
+            dynamic_cast<TransactionId*>(
                 srcPtr->getOriginalTransactionId()->cloneDataStructure() ) );
     }
     this->setGroupID( srcPtr->getGroupID() );
@@ -147,8 +147,8 @@
     this->setExpiration( srcPtr->getExpiration() );
     this->setPriority( srcPtr->getPriority() );
     if( srcPtr->getReplyTo() != NULL ) {
-        this->setReplyTo( 
-            dynamic_cast<ActiveMQDestination*>( 
+        this->setReplyTo(
+            dynamic_cast<ActiveMQDestination*>(
                 srcPtr->getReplyTo()->cloneDataStructure() ) );
     }
     this->setTimestamp( srcPtr->getTimestamp() );
@@ -156,21 +156,21 @@
     this->setContent( srcPtr->getContent() );
     this->setMarshalledProperties( srcPtr->getMarshalledProperties() );
     if( srcPtr->getDataStructure() != NULL ) {
-        this->setDataStructure( 
-            dynamic_cast<DataStructure*>( 
+        this->setDataStructure(
+            dynamic_cast<DataStructure*>(
                 srcPtr->getDataStructure()->cloneDataStructure() ) );
     }
     if( srcPtr->getTargetConsumerId() != NULL ) {
-        this->setTargetConsumerId( 
-            dynamic_cast<ConsumerId*>( 
+        this->setTargetConsumerId(
+            dynamic_cast<ConsumerId*>(
                 srcPtr->getTargetConsumerId()->cloneDataStructure() ) );
     }
     this->setCompressed( srcPtr->isCompressed() );
     this->setRedeliveryCounter( srcPtr->getRedeliveryCounter() );
     for( size_t ibrokerPath = 0; ibrokerPath < srcPtr->getBrokerPath().size(); ++ibrokerPath ) {
         if( srcPtr->getBrokerPath()[ibrokerPath] != NULL ) {
-            this->getBrokerPath().push_back( 
-                dynamic_cast<BrokerId*>( 
+            this->getBrokerPath().push_back(
+                dynamic_cast<BrokerId*>(
                     srcPtr->getBrokerPath()[ibrokerPath]->cloneDataStructure() ) );
         } else {
             this->getBrokerPath().push_back( NULL );
@@ -182,8 +182,8 @@
     this->setDroppable( srcPtr->isDroppable() );
     for( size_t icluster = 0; icluster < srcPtr->getCluster().size(); ++icluster ) {
         if( srcPtr->getCluster()[icluster] != NULL ) {
-            this->getCluster().push_back( 
-                dynamic_cast<BrokerId*>( 
+            this->getCluster().push_back(
+                dynamic_cast<BrokerId*>(
                     srcPtr->getCluster()[icluster]->cloneDataStructure() ) );
         } else {
             this->getCluster().push_back( NULL );
@@ -195,7 +195,7 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 unsigned char Message::getDataStructureType() const {
-    return Message::ID_MESSAGE; 
+    return Message::ID_MESSAGE;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -204,7 +204,7 @@
     ostringstream stream;
 
     stream << "Begin Class = Message" << std::endl;
-    stream << " Value of Message::ID_MESSAGE = 0" << std::endl; 
+    stream << " Value of Message::ID_MESSAGE = 0" << std::endl;
     stream << " Value of ProducerId is Below:" << std::endl;
     if( this->getProducerId() != NULL ) {
         stream << this->getProducerId()->toString() << std::endl;
@@ -456,6 +456,17 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+unsigned int Message::getSize() const {
+
+    long long size = DEFAULT_MESSAGE_SIZE;
+
+    size += this->getContent().size();
+    size += this->getMarshalledProperties().size();
+
+    return size;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 const ProducerId* Message::getProducerId() const {
     return producerId;
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/commands/Message.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/commands/Message.h?rev=718265&r1=718264&r2=718265&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/commands/Message.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/commands/Message.h Mon Nov 17 07:56:47 2008
@@ -57,6 +57,8 @@
     {
     protected:
 
+        static const unsigned int DEFAULT_MESSAGE_SIZE = 1024;
+
         ProducerId* producerId;
         ActiveMQDestination* destination;
         TransactionId* transactionId;
@@ -132,6 +134,12 @@
          */
         virtual bool equals( const DataStructure* value ) const;
 
+        /**
+         * Returns the Size of this message in Bytes.
+         * @returns number of bytes this message equates to.
+         */
+        virtual unsigned int getSize() const;
+
         virtual const ProducerId* getProducerId() const;
         virtual ProducerId* getProducerId();
         virtual void setProducerId( ProducerId* producerId );

Modified: activemq/activemq-cpp/trunk/src/main/activemq/util/MemoryUsage.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/util/MemoryUsage.cpp?rev=718265&r1=718264&r2=718265&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/util/MemoryUsage.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/util/MemoryUsage.cpp Mon Nov 17 07:56:47 2008
@@ -80,7 +80,7 @@
     }
 
     synchronized( &mutex ) {
-        this->usage -= value;
+        value > this->usage ? this->usage = 0 : this->usage -= value;
         mutex.notifyAll();
     }
 }