You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2008/01/10 06:35:49 UTC
svn commit: r610683 - in
/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil: CmsTemplate.cpp
CmsTemplate.h SessionCallback.h
Author: nmittler
Date: Wed Jan 9 21:35:49 2008
New Revision: 610683
URL: http://svn.apache.org/viewvc?rev=610683&view=rev
Log:
AMQCPP-152 - Adding classes for support of CmsTemplate
Added:
activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/SessionCallback.h
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp
activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h
Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp?rev=610683&r1=610682&r2=610683&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp Wed Jan 9 21:35:49 2008
@@ -33,6 +33,8 @@
////////////////////////////////////////////////////////////////////////////////
CmsTemplate::~CmsTemplate() {
+
+ destroySessionPools();
}
////////////////////////////////////////////////////////////////////////////////
@@ -41,7 +43,7 @@
defaultDestinationName = "";
messageIdEnabled = true;
messageTimestampEnabled = true;
- pubSubNoLocal = false;
+ noLocal = false;
receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT;
explicitQosEnabled = false;
deliveryMode = cms::DeliveryMode::PERSISTENT;
@@ -50,13 +52,43 @@
}
////////////////////////////////////////////////////////////////////////////////
-void CmsTemplate::init() throw (cms::CMSException, IllegalStateException) {
+void CmsTemplate::createSessionPools() {
+
+ // Make sure they're destroyed first.
+ destroySessionPools();
+
+ /**
+ * Create the session pools.
+ */
+ for( int ix=0; ix<NUM_SESSION_POOLS; ++ix) {
+ sessionPools[ix] = new SessionPool(connection,
+ (cms::Session::AcknowledgeMode)ix,
+ getResourceLifecycleManager());
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::destroySessionPools() {
+ /**
+ * Destroy the session pools.
+ */
+ for( int ix=0; ix<NUM_SESSION_POOLS; ++ix) {
+ if( sessionPools[ix] != NULL ) {
+ delete sessionPools[ix];
+ sessionPools[ix] = NULL;
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::init() throw (cms::CMSException, IllegalStateException) {
+
// Invoke the base class.
CmsDestinationAccessor::init();
-
+
// Make sure we have a valid default destination.
- checkDefaultDestination();
+ checkDefaultDestination();
}
////////////////////////////////////////////////////////////////////////////////
@@ -68,3 +100,120 @@
}
}
+////////////////////////////////////////////////////////////////////////////////
+cms::Connection* CmsTemplate::getConnection()
+throw (cms::CMSException) {
+
+ try {
+
+ // If we don't have a connection, create one.
+ if( connection == NULL ) {
+
+ // Invoke the base class to create the connection and add it
+ // to the resource lifecycle manager.
+ connection = createConnection();
+
+ // Create the session pools, passing in this connection.
+ createSessionPools();
+ }
+
+ return connection;
+ }
+ AMQ_CATCH_RETHROW( cms::CMSException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Session* CmsTemplate::createSession()
+throw (cms::CMSException) {
+
+ try {
+
+ // Take a session from the pool.
+ return sessionPools[getSessionAcknowledgeMode()].takeSession();
+ }
+ AMQ_CATCH_RETHROW( cms::CMSException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::destroySession( cms::Session* session )
+throw (cms::CMSException) {
+
+ try {
+
+ if( session == NULL ) {
+ return;
+ }
+
+ // Close the session, but do not delete since it's a pooled session
+ session->close();
+ }
+ AMQ_CATCH_RETHROW( cms::CMSException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Producer* CmsTemplate::createProducer(cms::Session* session,
+ cms::Destination* dest) throw (cms::CMSException) {
+
+ try {
+
+ cms::MessageProducer* producer = session->createProducer(dest);
+ if (!isMessageIdEnabled()) {
+ producer->setDisableMessageID(true);
+ }
+ if (!isMessageTimestampEnabled()) {
+ producer->setDisableMessageTimestamp(true);
+ }
+
+ return producer;
+ }
+ AMQ_CATCH_RETHROW( cms::CMSException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::destroyProducer( cms::MessageProducer* producer)
+throw (cms::CMSException) {
+
+ try {
+
+ if( producer == NULL ) {
+ return;
+ }
+
+ // Close the producer, then destroy it.
+ producer->close();
+ delete producer;
+ }
+ AMQ_CATCH_RETHROW( cms::CMSException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* CmsTemplate::createConsumer(cms::Session* session,
+ cms::Destination* dest, const std::string& messageSelector)
+ throw (cms::CMSException) {
+
+ try {
+ cms::MessageConsumer* consumer = session->createConsumer(dest,
+ messageSelector,
+ isNoLocal());
+
+ return consumer;
+ }
+ AMQ_CATCH_RETHROW( cms::CMSException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::destroyConsumer( cms::MessageConsumer* consumer)
+throw (cms::CMSException) {
+
+ try {
+
+ if( consumer == NULL ) {
+ return;
+ }
+
+ // Close the consumer, then destroy it.
+ consumer->close();
+ delete consumer;
+ }
+ AMQ_CATCH_RETHROW( cms::CMSException )
+}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h?rev=610683&r1=610682&r2=610683&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h Wed Jan 9 21:35:49 2008
@@ -30,112 +30,118 @@
class CmsTemplate : public CmsDestinationAccessor
{
public:
-
+
/**
* Timeout value indicating that a receive operation should
* check if a message is immediately available without blocking.
*/
static const long long RECEIVE_TIMEOUT_NO_WAIT = -1;
-
+
/**
* Timeout value indicating a blocking receive without timeout.
*/
static const long long RECEIVE_TIMEOUT_INDEFINITE_WAIT = 0;
-
+
/**
* Default message priority.
*/
static const int DEFAULT_PRIORITY = 4;
-
+
/**
* My default, messages should live forever.
*/
- static const long long DEFAULT_TIME_TO_LIVE = 0;
+ static const long long DEFAULT_TIME_TO_LIVE = 0;
+
+ private:
+
+ static const int NUM_SESSION_POOLS = (int)cms::Session::SESSION_TRANSACTED + 1;
+
+ cms::Connection* connection;
- private:
+ SessionPool*[NUM_SESSION_POOLS] sessionPools;
cms::Destination* defaultDestination;
-
+
std::string defaultDestinationName;
-
+
bool messageIdEnabled;
-
+
bool messageTimestampEnabled;
-
- bool pubSubNoLocal;
-
+
+ bool noLocal;
+
long long receiveTimeout;
-
+
bool explicitQosEnabled;
-
+
int deliveryMode;
-
+
int priority;
-
+
long long timeToLive;
-
+
public:
-
- CmsTemplate();
- CmsTemplate(cms::ConnectionFactory* connectionFactory);
-
- virtual ~CmsTemplate();
-
- /**
+
+ CmsTemplate();
+ CmsTemplate(cms::ConnectionFactory* connectionFactory);
+
+ virtual ~CmsTemplate();
+
+ /**
* Initializes this object and prepares it for use. This should be called
* before any other methds are called.
*/
- virtual void init()
+ virtual void init()
throw (cms::CMSException, decaf::lang::exceptions::IllegalStateException);
-
- virtual void setDefaultDestination(cms::Destination* defaultDestination) {
- this->defaultDestination = defaultDestination;
- }
-
- virtual const cms::Destination* getDefaultDestination() const {
- return this->defaultDestination;
- }
-
- virtual void setDefaultDestinationName(const std::string& defaultDestinationName) {
- this->defaultDestinationName = defaultDestinationName;
- }
-
- virtual const std::string getDefaultDestinationName() const {
+
+ virtual void setDefaultDestination(cms::Destination* defaultDestination) {
+ this->defaultDestination = defaultDestination;
+ }
+
+ virtual const cms::Destination* getDefaultDestination() const {
+ return this->defaultDestination;
+ }
+
+ virtual void setDefaultDestinationName(const std::string& defaultDestinationName) {
+ this->defaultDestinationName = defaultDestinationName;
+ }
+
+ virtual const std::string getDefaultDestinationName() const {
return this->defaultDestinationName;
- }
-
- virtual void setMessageIdEnabled(bool messageIdEnabled) {
- this->messageIdEnabled = messageIdEnabled;
- }
-
- virtual bool isMessageIdEnabled() const {
+ }
+
+ virtual void setMessageIdEnabled(bool messageIdEnabled) {
+ this->messageIdEnabled = messageIdEnabled;
+ }
+
+ virtual bool isMessageIdEnabled() const {
return this->messageIdEnabled;
- }
-
+ }
+
virtual void setMessageTimestampEnabled(bool messageTimestampEnabled) {
this->messageTimestampEnabled = messageTimestampEnabled;
}
-
+
virtual bool isMessageTimestampEnabled() const {
return this->messageTimestampEnabled;
- }
-
- virtual void setPubSubNoLocal(bool pubSubNoLocal) {
- this->pubSubNoLocal = pubSubNoLocal;
- }
-
- virtual bool isPubSubNoLocal() const {
- return this->pubSubNoLocal;
}
-
+
+ virtual void setNoLocal(bool noLocal) {
+ this->noLocal = noLocal;
+ }
+
+ virtual bool isNoLocal() const {
+ return this->noLocal;
+ }
+
virtual void setReceiveTimeout(long long receiveTimeout) {
this->receiveTimeout = receiveTimeout;
}
-
+
virtual long long getReceiveTimeout() const {
return this->receiveTimeout;
}
-
+
/**
* Set if the QOS values (deliveryMode, priority, timeToLive)
* should be used for sending a message.
@@ -147,7 +153,7 @@
virtual void setExplicitQosEnabled(bool explicitQosEnabled) {
this->explicitQosEnabled = explicitQosEnabled;
}
-
+
/**
* If "true", then the values of deliveryMode, priority, and timeToLive
* will be used when sending a message. Otherwise, the default values,
@@ -163,7 +169,7 @@
virtual bool isExplicitQosEnabled() const {
return this->explicitQosEnabled;
}
-
+
/**
* Set whether message delivery should be persistent or non-persistent,
* specified as boolean value ("true" or "false"). This will set the delivery
@@ -175,7 +181,7 @@
virtual void setDeliveryPersistent(bool deliveryPersistent) {
this->deliveryMode = (deliveryPersistent ? cms::DeliveryMode.PERSISTENT : cms::DeliveryMode.NON_PERSISTENT);
}
-
+
/**
* Set the delivery mode to use when sending a message.
* Default is the Message default: "PERSISTENT".
@@ -187,14 +193,14 @@
virtual void setDeliveryMode(int deliveryMode) {
this.deliveryMode = deliveryMode;
}
-
+
/**
* Return the delivery mode to use when sending a message.
*/
virtual int getDeliveryMode() const {
return this->deliveryMode;
}
-
+
/**
* Set the priority of a message when sending.
* <p>Since a default value may be defined administratively,
@@ -202,17 +208,17 @@
*
* @see #isExplicitQosEnabled
*/
- public void setPriority(int priority) {
+ virtual public void setPriority(int priority) {
this->priority = priority;
}
-
+
/**
* Return the priority of a message when sending.
*/
virtual int getPriority() const {
return this->priority;
}
-
+
/**
* Set the time-to-live of the message when sending.
* <p>Since a default value may be defined administratively,
@@ -224,19 +230,107 @@
virtual void setTimeToLive(long long timeToLive) {
this->timeToLive = timeToLive;
}
-
+
/**
* Return the time-to-live of the message when sending.
*/
virtual long long getTimeToLive() const {
return this->timeToLive;
}
-
+
private:
-
+
+ /**
+ * Initializes all members to their defaults.
+ */
void initDefaults();
- void checkDefaultDestination() throw (decaf::lang::exceptions::IllegalStateException);
+ /**
+ * Creates the session pools objects.
+ */
+ void createSessionPools();
+
+ /**
+ * Destroys the session pool objects.
+ */
+ void destroySessionPools();
+
+ /**
+ * Checks that the default destination is valid, if not throws
+ * an exception.
+ * @throws decaf::lang::exceptions::IllegalStateException thrown
+ * if the default destination is invalid.
+ */
+ void checkDefaultDestination()
+ throw (decaf::lang::exceptions::IllegalStateException);
+
+ /**
+ * Gets the connection, creating it if it doesn't already exist.
+ * @return the connection
+ * @throws cms::CMSException if any of the CMS methods throw.
+ */
+ cms::Connection* getConnection() throw (cms::CMSException);
+
+ /**
+ * Creates a session initialized with the proper values.
+ *
+ * @return the session
+ * @throws cms::CMSException if any of the CMS methods throw.
+ */
+ cms::Session* createSession() throw (cms::CMSException);
+
+ /**
+ * Closes, but does not destroy the pooled session resource.
+ * @aaram session
+ * a pooled session resource
+ * @throws cms::CMSException thrown if the CMS methods throw.
+ */
+ void destroySession( cms::Session* session ) throw (cms::CMSException);
+
+ /**
+ * Allocates a producer initialized with the proper values.
+ *
+ * @param session
+ * The session from which to create a producer
+ * @param dest
+ * The destination for which to create the producer.
+ * @return the producer
+ * @throws cms::CMSException thrown by the CMS API
+ */
+ cms::MessageProducer* createProducer(cms::Session* session,
+ cms::Destination* dest) throw (cms::CMSException);
+
+ /**
+ * Closes and destroys a producer resource
+ * @aaram producer
+ * a producer to destroy
+ * @throws cms::CMSException thrown if the CMS methods throw.
+ */
+ void destroyProducer( cms::MessageProducer* producer ) throw (cms::CMSException);
+
+ /**
+ * Allocates a consumer initialized with the proper values.
+ *
+ * @param session
+ * The session from which to create a consumer
+ * @param dest
+ * The destination for which to create the consumer
+ * @param messageSelector
+ * The message selector for the consumer.
+ * @return the new consumer
+ * @throws cms::CMSException if the CMS methods throw.
+ */
+ cms::MessageConsumer* createConsumer(cms::Session* session,
+ cms::Destination* dest, const std::string& messageSelector)
+ throw (cms::CMSException);
+
+ /**
+ * Closes and destroys a consumer resource
+ * @aaram consumer
+ * a consumer to destroy
+ * @throws cms::CMSException thrown if the CMS methods throw.
+ */
+ void destroyConsumer( cms::MessageConsumer* consumer ) throw (cms::CMSException);
};
}}
Added: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/SessionCallback.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/SessionCallback.h?rev=610683&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/SessionCallback.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/SessionCallback.h Wed Jan 9 21:35:49 2008
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CMSUTIL_SESSIONCALLBACK_H
+#define ACTIVEMQ_CMSUTIL_SESSIONCALLBACK_H
+
+namespace activemq {
+namespace cmsutil {
+
+ /**
+ * Callback for executing any number of operations on a provided
+ * CMS Session.
+ */
+ class SessionCallback {
+
+ virtual ~SessionCallback();
+
+ /**
+ * Execute any number of operations against the supplied CMS
+ * session.
+ *
+ * @param session
+ * the CMS <code>Session</code>
+ * @throws cms::CMSException if thrown by CMS API methods
+ */
+ virtual void doInCms(cms::Session* session) throw (cms::CMSException) = 0;
+
+ }
+
+}}
+
+#endif /*ACTIVEMQ_CMSUTIL_SESSIONCALLBACK_H*/