You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2008/01/10 06:35:49 UTC

svn commit: r610683 - in /activemq/activemq-cpp/trunk/src/main/activemq/cmsutil: CmsTemplate.cpp CmsTemplate.h SessionCallback.h

Author: nmittler
Date: Wed Jan  9 21:35:49 2008
New Revision: 610683

URL: http://svn.apache.org/viewvc?rev=610683&view=rev
Log:
AMQCPP-152 - Adding classes for support of CmsTemplate

Added:
    activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/SessionCallback.h
Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h

Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp?rev=610683&r1=610682&r2=610683&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp Wed Jan  9 21:35:49 2008
@@ -33,6 +33,8 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 CmsTemplate::~CmsTemplate() {
+    
+    destroySessionPools();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -41,7 +43,7 @@
     defaultDestinationName = "";
     messageIdEnabled = true;
     messageTimestampEnabled = true;
-    pubSubNoLocal = false;
+    noLocal = false;
     receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT;
     explicitQosEnabled = false;
     deliveryMode = cms::DeliveryMode::PERSISTENT;
@@ -50,13 +52,43 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void CmsTemplate::init() throw (cms::CMSException, IllegalStateException) {
+void CmsTemplate::createSessionPools() {
+    
+    // Make sure they're destroyed first.
+    destroySessionPools();
+    
+    /**
+     * Create the session pools.
+     */
+    for( int ix=0; ix<NUM_SESSION_POOLS; ++ix) {
+        sessionPools[ix] = new SessionPool(connection,
+                (cms::Session::AcknowledgeMode)ix, 
+                getResourceLifecycleManager());
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::destroySessionPools() {
     
+    /**
+     * Destroy the session pools.
+     */
+    for( int ix=0; ix<NUM_SESSION_POOLS; ++ix) {
+        if( sessionPools[ix] != NULL ) {
+            delete sessionPools[ix];
+            sessionPools[ix] = NULL;
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::init() throw (cms::CMSException, IllegalStateException) {
+
     // Invoke the base class.
     CmsDestinationAccessor::init();
-    
+
     // Make sure we have a valid default destination.
-    checkDefaultDestination();   
+    checkDefaultDestination();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -68,3 +100,120 @@
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+cms::Connection* CmsTemplate::getConnection() 
+throw (cms::CMSException) {
+
+    try {
+        
+        // If we don't have a connection, create one.
+        if( connection == NULL ) {
+        
+            // Invoke the base class to create the connection and add it
+            // to the resource lifecycle manager.
+            connection = createConnection();
+            
+            // Create the session pools, passing in this connection.
+            createSessionPools();
+        }
+        
+        return connection;
+    }
+    AMQ_CATCH_RETHROW( cms::CMSException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Session* CmsTemplate::createSession() 
+throw (cms::CMSException) {
+
+    try {
+        
+        // Take a session from the pool.
+        return sessionPools[getSessionAcknowledgeMode()].takeSession();        
+    }
+    AMQ_CATCH_RETHROW( cms::CMSException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::destroySession( cms::Session* session ) 
+throw (cms::CMSException) {
+
+    try {
+
+        if( session == NULL ) {
+            return;
+        }
+        
+        // Close the session, but do not delete since it's a pooled session
+        session->close();
+    }
+    AMQ_CATCH_RETHROW( cms::CMSException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Producer* CmsTemplate::createProducer(cms::Session* session,
+        cms::Destination* dest) throw (cms::CMSException) {
+
+    try {
+
+        cms::MessageProducer* producer = session->createProducer(dest);
+        if (!isMessageIdEnabled()) {
+            producer->setDisableMessageID(true);
+        }
+        if (!isMessageTimestampEnabled()) {
+            producer->setDisableMessageTimestamp(true);
+        }
+
+        return producer;
+    }
+    AMQ_CATCH_RETHROW( cms::CMSException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::destroyProducer( cms::MessageProducer* producer) 
+throw (cms::CMSException) {
+
+    try {
+
+        if( producer == NULL ) {
+            return;
+        }
+        
+        // Close the producer, then destroy it.
+        producer->close();        
+        delete producer;
+    }
+    AMQ_CATCH_RETHROW( cms::CMSException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* CmsTemplate::createConsumer(cms::Session* session,
+        cms::Destination* dest, const std::string& messageSelector)
+        throw (cms::CMSException) {
+
+    try {
+        cms::MessageConsumer* consumer = session->createConsumer(dest,
+                messageSelector, 
+                isNoLocal());
+        
+        return consumer;
+    }
+    AMQ_CATCH_RETHROW( cms::CMSException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::destroyConsumer( cms::MessageConsumer* consumer) 
+throw (cms::CMSException) {
+
+    try {
+
+        if( consumer == NULL ) {
+            return;
+        }
+        
+        // Close the consumer, then destroy it.
+        consumer->close();        
+        delete consumer;
+    }
+    AMQ_CATCH_RETHROW( cms::CMSException )
+}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h?rev=610683&r1=610682&r2=610683&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h Wed Jan  9 21:35:49 2008
@@ -30,112 +30,118 @@
     class CmsTemplate : public CmsDestinationAccessor
     {
     public:
-        
+    
         /**
          * Timeout value indicating that a receive operation should
          * check if a message is immediately available without blocking.
          */
         static const long long RECEIVE_TIMEOUT_NO_WAIT = -1;
-
+    
         /**
          * Timeout value indicating a blocking receive without timeout.
          */
         static const long long RECEIVE_TIMEOUT_INDEFINITE_WAIT = 0;
-        
+    
         /**
          * Default message priority.
          */
         static const int DEFAULT_PRIORITY = 4;
-        
+    
         /**
          * My default, messages should live forever.
          */
-        static const long long DEFAULT_TIME_TO_LIVE = 0;
+        static const long long DEFAULT_TIME_TO_LIVE = 0;                
+    
+    private:
+                
+        static const int NUM_SESSION_POOLS = (int)cms::Session::SESSION_TRANSACTED + 1;
+    
+        cms::Connection* connection;
         
-    private:                
+        SessionPool*[NUM_SESSION_POOLS] sessionPools;
         
         cms::Destination* defaultDestination;
-        
+    
         std::string defaultDestinationName;
-        
+    
         bool messageIdEnabled;
-        
+    
         bool messageTimestampEnabled;
-        
-        bool pubSubNoLocal;
-        
+    
+        bool noLocal;
+    
         long long receiveTimeout;
-        
+    
         bool explicitQosEnabled;
-        
+    
         int deliveryMode;
-        
+    
         int priority;
-        
+    
         long long timeToLive;
-        
+    
     public:
-        
-    	CmsTemplate();
-    	CmsTemplate(cms::ConnectionFactory* connectionFactory);
-    	    
-    	virtual ~CmsTemplate();
-    	
-    	/**
+    
+        CmsTemplate();
+        CmsTemplate(cms::ConnectionFactory* connectionFactory);
+    
+        virtual ~CmsTemplate();
+    
+        /**
          * Initializes this object and prepares it for use.  This should be called
          * before any other methds are called.
          */
-        virtual void init() 
+        virtual void init()
         throw (cms::CMSException, decaf::lang::exceptions::IllegalStateException);
-    	        
-    	virtual void setDefaultDestination(cms::Destination* defaultDestination) {
-    	    this->defaultDestination = defaultDestination;
-    	}
-    	
-    	virtual const cms::Destination* getDefaultDestination() const {
-    	    return this->defaultDestination;
-    	}    	    	
-    	
-    	virtual void setDefaultDestinationName(const std::string& defaultDestinationName) {
-    	    this->defaultDestinationName = defaultDestinationName;
-    	}
-    	
-    	virtual const std::string getDefaultDestinationName() const {
+    
+        virtual void setDefaultDestination(cms::Destination* defaultDestination) {
+            this->defaultDestination = defaultDestination;
+        }
+    
+        virtual const cms::Destination* getDefaultDestination() const {
+            return this->defaultDestination;
+        }
+    
+        virtual void setDefaultDestinationName(const std::string& defaultDestinationName) {
+            this->defaultDestinationName = defaultDestinationName;
+        }
+    
+        virtual const std::string getDefaultDestinationName() const {
             return this->defaultDestinationName;
-        }            	
-    	
-    	virtual void setMessageIdEnabled(bool messageIdEnabled) {
-    	    this->messageIdEnabled = messageIdEnabled;
-    	}
-    	
-    	virtual bool isMessageIdEnabled() const {
+        }
+    
+        virtual void setMessageIdEnabled(bool messageIdEnabled) {
+            this->messageIdEnabled = messageIdEnabled;
+        }
+    
+        virtual bool isMessageIdEnabled() const {
             return this->messageIdEnabled;
-        }                
-        
+        }
+    
         virtual void setMessageTimestampEnabled(bool messageTimestampEnabled) {
             this->messageTimestampEnabled = messageTimestampEnabled;
         }
-        
+    
         virtual bool isMessageTimestampEnabled() const {
             return this->messageTimestampEnabled;
-        }                
-
-        virtual void setPubSubNoLocal(bool pubSubNoLocal) {
-            this->pubSubNoLocal = pubSubNoLocal;
-        }
-
-        virtual bool isPubSubNoLocal() const {
-            return this->pubSubNoLocal;
         }
-        
+    
+        virtual void setNoLocal(bool noLocal) {
+            this->noLocal = noLocal;
+        }
+    
+        virtual bool isNoLocal() const {
+            return this->noLocal;
+        }
+    
         virtual void setReceiveTimeout(long long receiveTimeout) {
             this->receiveTimeout = receiveTimeout;
         }
-
+    
         virtual long long getReceiveTimeout() const {
             return this->receiveTimeout;
         }
-        
+    
         /**
          * Set if the QOS values (deliveryMode, priority, timeToLive)
          * should be used for sending a message.
@@ -147,7 +153,7 @@
         virtual void setExplicitQosEnabled(bool explicitQosEnabled) {
             this->explicitQosEnabled = explicitQosEnabled;
         }
-
+    
         /**
          * If "true", then the values of deliveryMode, priority, and timeToLive
          * will be used when sending a message. Otherwise, the default values,
@@ -163,7 +169,7 @@
         virtual bool isExplicitQosEnabled() const {
             return this->explicitQosEnabled;
         }
-        
+    
         /**
          * Set whether message delivery should be persistent or non-persistent,
          * specified as boolean value ("true" or "false"). This will set the delivery
@@ -175,7 +181,7 @@
         virtual void setDeliveryPersistent(bool deliveryPersistent) {
             this->deliveryMode = (deliveryPersistent ? cms::DeliveryMode.PERSISTENT : cms::DeliveryMode.NON_PERSISTENT);
         }
-
+    
         /**
          * Set the delivery mode to use when sending a message.
          * Default is the Message default: "PERSISTENT".
@@ -187,14 +193,14 @@
         virtual void setDeliveryMode(int deliveryMode) {
             this.deliveryMode = deliveryMode;
         }
-        
+    
         /**
          * Return the delivery mode to use when sending a message.
          */
         virtual int getDeliveryMode() const {
             return this->deliveryMode;
         }
-        
+    
         /**
          * Set the priority of a message when sending.
          * <p>Since a default value may be defined administratively,
@@ -202,17 +208,17 @@
          * 
          * @see #isExplicitQosEnabled
          */
-        public void setPriority(int priority) {
+        virtual public void setPriority(int priority) {
             this->priority = priority;
         }
-            
+    
         /**
          * Return the priority of a message when sending.
          */
         virtual int getPriority() const {
             return this->priority;
         }
-
+    
         /**
          * Set the time-to-live of the message when sending.
          * <p>Since a default value may be defined administratively,
@@ -224,19 +230,107 @@
         virtual void setTimeToLive(long long timeToLive) {
             this->timeToLive = timeToLive;
         }
-
+    
         /**
          * Return the time-to-live of the message when sending.
          */
         virtual long long getTimeToLive() const {
             return this->timeToLive;
         }
-    	
+    
     private:
-        
+    
+        /**
+         * Initializes all members to their defaults.
+         */
         void initDefaults();
         
-        void checkDefaultDestination() throw (decaf::lang::exceptions::IllegalStateException);
+        /**
+         * Creates the session pools objects.
+         */
+        void createSessionPools();
+        
+        /**
+         * Destroys the session pool objects.
+         */
+        void destroySessionPools();
+    
+        /**
+         * Checks that the default destination is valid, if not throws
+         * an exception.
+         * @throws decaf::lang::exceptions::IllegalStateException thrown
+         * if the default destination is invalid.
+         */
+        void checkDefaultDestination()
+        throw (decaf::lang::exceptions::IllegalStateException);
+    
+        /**
+         * Gets the connection, creating it if it doesn't already exist.
+         * @return the connection
+         * @throws cms::CMSException if any of the CMS methods throw.
+         */
+        cms::Connection* getConnection() throw (cms::CMSException);
+        
+        /**
+         * Creates a session initialized with the proper values.
+         * 
+         * @return the session
+         * @throws cms::CMSException if any of the CMS methods throw.
+         */
+        cms::Session* createSession() throw (cms::CMSException);
+    
+        /**
+         * Closes, but does not destroy the pooled session resource.
+         * @aaram session
+         *          a pooled session resource
+         * @throws cms::CMSException thrown if the CMS methods throw.
+         */
+        void destroySession( cms::Session* session ) throw (cms::CMSException);
+    
+        /**
+         * Allocates a producer initialized with the proper values.
+         * 
+         * @param session
+         *          The session from which to create a producer
+         * @param dest
+         *          The destination for which to create the producer.
+         * @return the producer
+         * @throws cms::CMSException thrown by the CMS API
+         */
+        cms::MessageProducer* createProducer(cms::Session* session,
+                cms::Destination* dest) throw (cms::CMSException);
+        
+        /**
+         * Closes and destroys a producer resource
+         * @aaram producer
+         *          a producer to destroy
+         * @throws cms::CMSException thrown if the CMS methods throw.
+         */
+        void destroyProducer( cms::MessageProducer* producer ) throw (cms::CMSException);
+    
+        /**
+         * Allocates a consumer initialized with the proper values.
+         * 
+         * @param session
+         *          The session from which to create a consumer
+         * @param dest
+         *          The destination for which to create the consumer
+         * @param messageSelector
+         *          The message selector for the consumer.
+         * @return the new consumer
+         * @throws cms::CMSException if the CMS methods throw.
+         */
+        cms::MessageConsumer* createConsumer(cms::Session* session,
+                cms::Destination* dest, const std::string& messageSelector)
+        throw (cms::CMSException);
+        
+        /**
+         * Closes and destroys a consumer resource
+         * @aaram consumer
+         *          a consumer to destroy
+         * @throws cms::CMSException thrown if the CMS methods throw.
+         */
+        void destroyConsumer( cms::MessageConsumer* consumer ) throw (cms::CMSException);
     };
 
 }}

Added: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/SessionCallback.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/SessionCallback.h?rev=610683&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/SessionCallback.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/SessionCallback.h Wed Jan  9 21:35:49 2008
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CMSUTIL_SESSIONCALLBACK_H
+#define ACTIVEMQ_CMSUTIL_SESSIONCALLBACK_H
+
+namespace activemq {
+namespace cmsutil {
+
+    /**
+     * Callback for executing any number of operations on a provided
+     * CMS Session.
+     */
+    class SessionCallback {
+        
+        virtual ~SessionCallback();
+    
+        /**
+         * Execute any number of operations against the supplied CMS
+         * session.
+         * 
+         * @param session 
+         *          the CMS <code>Session</code>
+         * @throws cms::CMSException if thrown by CMS API methods
+         */
+        virtual void doInCms(cms::Session* session) throw (cms::CMSException) = 0;
+    
+    }
+
+}}
+
+#endif /*ACTIVEMQ_CMSUTIL_SESSIONCALLBACK_H*/