You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2006/12/04 01:41:28 UTC

svn commit: r481999 - in /incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq: connector/openwire/commands/ connector/stomp/commands/ core/

Author: tabish
Date: Sun Dec  3 16:41:27 2006
New Revision: 481999

URL: http://svn.apache.org/viewvc?view=rev&rev=481999
Log:
http://issues.apache.org/activemq/browse/AMQCPP-14

Added initail Time to Live processing to the Consumer, and the Time Stamping is now correct in the producer.

Modified:
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQBytesMessage.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMapMessage.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMessage.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQObjectMessage.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQStreamMessage.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQTextMessage.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessage.h

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQBytesMessage.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQBytesMessage.h?view=diff&rev=481999&r1=481998&r2=481999
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQBytesMessage.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQBytesMessage.h Sun Dec  3 16:41:27 2006
@@ -434,6 +434,22 @@
          */
         virtual unsigned long long getBodyLength(void) const;
 
+    public:  // ActiveMQMessage
+
+        /**
+         * Returns if this message has expired, meaning that its
+         * Expiration time has elapsed.
+         * @returns true if message is expired.
+         */
+        virtual bool isExpired() const {
+            long long expireTime = this->getCMSExpiration();
+            long long currentTime = util::Date::getCurrentTimeMilliseconds();
+            if( expireTime > 0 && currentTime > expireTime ) {
+                return true;
+            }
+            return false;
+        }
+
     };
 
 }}}}

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMapMessage.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMapMessage.h?view=diff&rev=481999&r1=481998&r2=481999
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMapMessage.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMapMessage.h Sun Dec  3 16:41:27 2006
@@ -578,6 +578,22 @@
         virtual void setString( const std::string& name, 
                                 const std::string& value );
 
+    public:  // ActiveMQMessage
+
+        /**
+         * Returns if this message has expired, meaning that its
+         * Expiration time has elapsed.
+         * @returns true if message is expired.
+         */
+        virtual bool isExpired() const {
+            long long expireTime = this->getCMSExpiration();
+            long long currentTime = util::Date::getCurrentTimeMilliseconds();
+            if( expireTime > 0 && currentTime > expireTime ) {
+                return true;
+            }
+            return false;
+        }
+
     };
 
 }}}}

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMessage.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMessage.h?view=diff&rev=481999&r1=481998&r2=481999
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMessage.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMessage.h Sun Dec  3 16:41:27 2006
@@ -22,6 +22,7 @@
 #include <activemq/core/ActiveMQMessage.h>
 #include <activemq/connector/openwire/marshal/BaseDataStreamMarshaller.h>
 #include <activemq/core/ActiveMQAckHandler.h>
+#include <activemq/util/Date.h>
 
 namespace activemq{
 namespace connector{
@@ -106,6 +107,15 @@
             this->redeliveryCount = count;
         }
 
+        /**
+         * Returns if this message has expired, meaning that its
+         * Expiration time has elapsed.
+         * @returns true if message is expired.
+         */
+        virtual bool isExpired() const {
+            return false;
+        }
+      
     private:
    
         core::ActiveMQAckHandler* ackHandler;

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQObjectMessage.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQObjectMessage.h?view=diff&rev=481999&r1=481998&r2=481999
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQObjectMessage.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQObjectMessage.h Sun Dec  3 16:41:27 2006
@@ -67,6 +67,18 @@
         virtual void copyDataStructure( const DataStructure* src ) {
             ActiveMQMessage::copyDataStructure( src );
         }
+
+
+    public:  // ActiveMQMessage
+
+        /**
+         * Returns if this message has expired, meaning that its
+         * Expiration time has elapsed.
+         * @returns true if message is expired.
+         */
+        virtual bool isExpired() const {
+            return false;
+        }
         
     };
 

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQStreamMessage.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQStreamMessage.h?view=diff&rev=481999&r1=481998&r2=481999
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQStreamMessage.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQStreamMessage.h Sun Dec  3 16:41:27 2006
@@ -68,6 +68,17 @@
             ActiveMQMessage::copyDataStructure( src );
         }
 
+    public:  // ActiveMQMessage
+
+        /**
+         * Returns if this message has expired, meaning that its
+         * Expiration time has elapsed.
+         * @returns true if message is expired.
+         */
+        virtual bool isExpired() const {
+            return false;
+        }
+      
     };
 
 }}}}

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQTextMessage.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQTextMessage.h?view=diff&rev=481999&r1=481998&r2=481999
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQTextMessage.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQTextMessage.h Sun Dec  3 16:41:27 2006
@@ -428,6 +428,22 @@
          */
         virtual void setText( const std::string& msg ) throw( cms::CMSException );
 
+    public:  // ActiveMQMessage
+
+        /**
+         * Returns if this message has expired, meaning that its
+         * Expiration time has elapsed.
+         * @returns true if message is expired.
+         */
+        virtual bool isExpired() const {
+            long long expireTime = this->getCMSExpiration();
+            long long currentTime = util::Date::getCurrentTimeMilliseconds();
+            if( expireTime > 0 && currentTime > expireTime ) {
+                return true;
+            }
+            return false;
+        }
+
     };
 
 }}}}

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h?view=diff&rev=481999&r1=481998&r2=481999
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h Sun Dec  3 16:41:27 2006
@@ -27,6 +27,7 @@
 #include <activemq/exceptions/NoSuchElementException.h>
 #include <activemq/exceptions/RuntimeException.h>
 
+#include <activemq/util/Date.h>
 #include <activemq/util/Long.h>
 #include <activemq/util/Integer.h>
 #include <activemq/util/Boolean.h>
@@ -516,6 +517,20 @@
                 CommandConstants::toString( 
                     CommandConstants::HEADER_REDELIVERYCOUNT ),
                 util::Integer::toString( count ) );
+        }
+
+        /**
+         * Returns if this message has expired, meaning that its
+         * Expiration time has elapsed.
+         * @returns true if message is expired.
+         */
+        virtual bool isExpired() const {
+            long long expireTime = this->getCMSExpiration();
+            long long currentTime = util::Date::getCurrentTimeMilliseconds();
+            if( expireTime > 0 && currentTime > expireTime ) {
+                return true;
+            }
+            return false;
         }
 
     protected:   

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?view=diff&rev=481999&r1=481998&r2=481999
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Sun Dec  3 16:41:27 2006
@@ -291,23 +291,31 @@
 {
     try
     {
+        // Don't dispatch expired messages, ack it and then destroy it
+        if( message->isExpired() ) {
+            session->acknowledge( this, message );
+            delete message;
+
+            // stop now, don't queue
+            return;
+        }
+        
         // If the Session is in ClientAcknowledge mode, then we set the 
         // handler in the message to this object and send it out.  Otherwise
         // we ack it here for all the other Modes.
-        if( session->getAcknowledgeMode() == Session::CLIENT_ACKNOWLEDGE )
-        {
+        if( session->getAcknowledgeMode() == Session::CLIENT_ACKNOWLEDGE ) {
+            
             // Register ourself so that we can handle the Message's
             // acknowledge method.
             message->setAckHandler( this );
-        }
-        else
-        {
+            
+        } else {
             session->acknowledge( this, message );
         }
 
         // No listener, so we queue it
-        synchronized( &msgQueue )
-        {
+        synchronized( &msgQueue ) {
+            
             msgQueue.push( dynamic_cast< cms::Message* >( message ) );
             msgQueue.notifyAll();
         }

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessage.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessage.h?view=diff&rev=481999&r1=481998&r2=481999
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessage.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessage.h Sun Dec  3 16:41:27 2006
@@ -57,6 +57,13 @@
          * @param count the redelivery count
          */
         virtual void setRedeliveryCount( int count ) = 0;
+        
+        /**
+         * Returns if this message has expired, meaning that its
+         * Expiration time has elapsed.
+         * @returns true if message is expired.
+         */
+        virtual bool isExpired() const = 0;
 
     };