You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2008/01/12 22:16:30 UTC
svn commit: r611484 - in
/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil: CmsTemplate.cpp
CmsTemplate.h MessageCreator.h ProducerCallback.h
Author: nmittler
Date: Sat Jan 12 13:16:29 2008
New Revision: 611484
URL: http://svn.apache.org/viewvc?rev=611484&view=rev
Log:
AMQCPP-152 - Adding classes for support of CmsTemplate
Added:
activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/MessageCreator.h
activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/ProducerCallback.h
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp
activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h
Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp?rev=611484&r1=611483&r2=611484&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp Sat Jan 12 13:16:29 2008
@@ -113,6 +113,9 @@
// to the resource lifecycle manager.
connection = createConnection();
+ // Start the connection.
+ connection->start();
+
// Create the session pools, passing in this connection.
createSessionPools();
}
@@ -123,7 +126,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-cms::Session* CmsTemplate::createSession()
+PooledSession* CmsTemplate::takeSession()
throw (cms::CMSException) {
try {
@@ -135,7 +138,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void CmsTemplate::destroySession( cms::Session* session )
+void CmsTemplate::returnSession( PooledSession*& session )
throw (cms::CMSException) {
try {
@@ -146,6 +149,7 @@
// Close the session, but do not delete since it's a pooled session
session->close();
+ session = NULL;
}
AMQ_CATCH_RETHROW( cms::CMSException )
}
@@ -156,6 +160,10 @@
try {
+ if( dest == NULL ) {
+ dest = getDefaultDestination();
+ }
+
cms::MessageProducer* producer = session->createProducer(dest);
if (!isMessageIdEnabled()) {
producer->setDisableMessageID(true);
@@ -170,20 +178,22 @@
}
////////////////////////////////////////////////////////////////////////////////
-void CmsTemplate::destroyProducer( cms::MessageProducer* producer)
+void CmsTemplate::destroyProducer( cms::MessageProducer*& producer)
throw (cms::CMSException) {
- try {
-
- if( producer == NULL ) {
- return;
- }
+ if( producer == NULL ) {
+ return;
+ }
+
+ try {
// Close the producer, then destroy it.
- producer->close();
- delete producer;
+ producer->close();
}
- AMQ_CATCH_RETHROW( cms::CMSException )
+ AMQ_CATCH_NO_RETHROW( cms::CMSException )
+
+ delete producer;
+ producer = NULL;
}
////////////////////////////////////////////////////////////////////////////////
@@ -202,18 +212,144 @@
}
////////////////////////////////////////////////////////////////////////////////
-void CmsTemplate::destroyConsumer( cms::MessageConsumer* consumer)
+void CmsTemplate::destroyConsumer( cms::MessageConsumer*& consumer)
throw (cms::CMSException) {
- try {
+ if( consumer == NULL ) {
+ return;
+ }
+
+ try {
+
+ // Close the consumer, then destroy it.
+ consumer->close();
+ }
+ AMQ_CATCH_NO_RETHROW( cms::CMSException )
+
+ delete consumer;
+ consumer = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::destroyMessage( cms::Message*& message) {
+
+ if( message == NULL ) {
+ return;
+ }
+
+ // Destroy the message.
+ delete message;
+ message = NULL;
+}
- if( consumer == NULL ) {
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::execute(SessionCallback* action) throw (cms::CMSException) {
+
+ PooledSession* pooledSession = NULL;
+
+ try {
+
+ if( action == NULL ) {
return;
}
+
+ // Take a session from the pool.
+ pooledSession = takeSession();
- // Close the consumer, then destroy it.
- consumer->close();
- delete consumer;
+ // Execute the action with the given session.
+ action->doInCms(pooledSession);
+
+ // Return the session to the pool.
+ returnSession(pooledSession);
+
+ } catch( cms::CMSException& e ) {
+
+ e.setMark(__FILE__, __LINE__);
+
+ // Return the session to the pool.
+ returnSession(pooledSession);
+
+ throw e;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::execute(ProducerCallback* action) throw (cms::CMSException) {
+
+ try {
+
+ // Create the callback.
+ ProducerSessionCallback cb(action);
+
+ // Execute the action in a session.
+ execute(&cb);
}
AMQ_CATCH_RETHROW( cms::CMSException )
}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::ProducerSessionCallback::doInCms( cms::Session* session )
+throw (cms::CMSException) {
+
+ MessageProducer* producer = NULL;
+
+ try {
+
+ if( session == NULL ) {
+ return;
+ }
+
+ // Create the producer.
+ producer = createProducer(session, null);
+
+ // Execute the action.
+ action->doInCms(session, producer);
+
+ // Destroy the producer.
+ destroyProducer(producer);
+
+ } catch( cms::CMSException& e) {
+
+ // Destroy the producer.
+ destroyProducer(producer);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::doSend(cms::Session* session, cms::Destination* dest,
+ MessageCreator* messageCreator) throw (cms::CMSException) {
+
+ cms::MessageProducer* producer = NULL;
+ cms::Message* message = NULL;
+
+ try {
+
+ if( session == NULL || dest == NULL) {
+ return;
+ }
+
+ // Create the producer.
+ producer = createProducer(session, dest);
+
+ // Create the message.
+ message = messageCreator->createMessage(session);
+
+ // Send the message.
+ if( isExplicitQosEnabled() ) {
+ producer->send(message, getDeliveryMode(), getPriority(), getTimeToLive());
+ } else {
+ producer->send(message);
+ }
+
+ // Destroy the resources.
+ destroyProducer(producer);
+ destroyMessage(message);
+
+ } catch( cms::CMSException& e) {
+
+ // Destroy the resources.
+ destroyProducer(producer);
+ destroyMessage(message);
+ }
+}
+
Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h?rev=611484&r1=611483&r2=611484&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h Sat Jan 12 13:16:29 2008
@@ -18,7 +18,8 @@
#ifndef ACTIVEMQ_CMSUTIL_CMSTEMPLATE_H_
#define ACTIVEMQ_CMSUTIL_CMSTEMPLATE_H_
-#include <activemq/cms/CmsDestinationAccessor.h>
+#include <activemq/cmsutil/CmsDestinationAccessor.h>
+#include <activemq/cmsutil/SessionCallback.h>
#include <decaf/lang/exceptions/IllegalStateException.h>
#include <cms/ConnectionFactory.h>
#include <cms/DeliveryMode.h>
@@ -27,6 +28,9 @@
namespace activemq {
namespace cmsutil {
+ // Forward declarations.
+ class SessionCallback;
+
class CmsTemplate : public CmsDestinationAccessor
{
public:
@@ -53,6 +57,25 @@
static const long long DEFAULT_TIME_TO_LIVE = 0;
private:
+
+ /**
+ * Session callback that executes a producer callback.
+ */
+ class ProducerSessionCallback : public SessionCallback {
+ private:
+
+ ProducerCallback* action;
+
+ public:
+
+ ProducerSessionCallback(PoducerCallback* action){
+ this->action = action;
+ }
+
+ virtual ~ProducerSessionCallback() {}
+
+ virtual void doInCms(cms::Session* session) throw (cms::CMSException);
+ };
static const int NUM_SESSION_POOLS = (int)cms::Session::SESSION_TRANSACTED + 1;
@@ -237,6 +260,24 @@
virtual long long getTimeToLive() const {
return this->timeToLive;
}
+
+ /**
+ * Executes the given action within a CMS Session.
+ * @param action
+ * the action to perform within a CMS Session
+ * @throws cms::CMSException thrown if an error occurs.
+ */
+ virtual void execute(SessionCallback* action) throw (cms::CMSException);
+
+ /**
+ * Executes the given action and provides it with a CMS Session and
+ * producer
+ *
+ * @param action
+ * the action to perform
+ * @throws cms::CMSException thrown if an error occurs.
+ */
+ virtual void execute(ProducerCallback* action) throw (cms::CMSException);
private:
@@ -277,7 +318,7 @@
* @return the session
* @throws cms::CMSException if any of the CMS methods throw.
*/
- cms::Session* createSession() throw (cms::CMSException);
+ PooledSession* takeSession() throw (cms::CMSException);
/**
* Closes, but does not destroy the pooled session resource.
@@ -285,7 +326,7 @@
* a pooled session resource
* @throws cms::CMSException thrown if the CMS methods throw.
*/
- void destroySession( cms::Session* session ) throw (cms::CMSException);
+ void returnSession( PooledSession*& session ) throw (cms::CMSException);
/**
* Allocates a producer initialized with the proper values.
@@ -293,7 +334,8 @@
* @param session
* The session from which to create a producer
* @param dest
- * The destination for which to create the producer.
+ * The destination for which to create the producer. If
+ * this is NULL, the default will be used.
* @return the producer
* @throws cms::CMSException thrown by the CMS API
*/
@@ -306,7 +348,7 @@
* a producer to destroy
* @throws cms::CMSException thrown if the CMS methods throw.
*/
- void destroyProducer( cms::MessageProducer* producer ) throw (cms::CMSException);
+ void destroyProducer( cms::MessageProducer*& producer ) throw (cms::CMSException);
/**
* Allocates a consumer initialized with the proper values.
@@ -330,7 +372,27 @@
* a consumer to destroy
* @throws cms::CMSException thrown if the CMS methods throw.
*/
- void destroyConsumer( cms::MessageConsumer* consumer ) throw (cms::CMSException);
+ void destroyConsumer( cms::MessageConsumer*& consumer ) throw (cms::CMSException);
+
+ /**
+ * Destroys the given message
+ * @param message
+ * the message to destroy
+ */
+ void destroyMessage( cms::Message*& message );
+
+ /**
+ * Sends a message to a destination.
+ * @param session
+ * the session to be used.
+ * @param dest
+ * the destination to send the message on.
+ * @param messageCreator
+ * creates the message to be sent
+ * @throws cms::CMSException thrown if the CMS API throws.
+ */
+ void doSend(cms::Session* session, cms::Destination* dest,
+ MessageCreator* messageCreator) throw (cms::CMSException);
};
}}
Added: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/MessageCreator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/MessageCreator.h?rev=611484&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/MessageCreator.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/MessageCreator.h Sat Jan 12 13:16:29 2008
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CMSUTIL_MESSAGECREATOR_H
+#define ACTIVEMQ_CMSUTIL_MESSAGECREATOR_H
+
+namespace activemq {
+namespace cmsutil {
+
+ /**
+ * Creates the user-defined message to be sent by the
+ * <code>CmsTemplate</code>.
+ */
+ class MessageCreator {
+
+ virtual ~MessageCreator();
+
+ /**
+ * Creates a message from the given session.
+ *
+ * @param session
+ * the CMS <code>Session</code>
+ * @throws cms::CMSException if thrown by CMS API methods
+ */
+ virtual cms::Message* createMessage(cms::Session* session )
+ throw (cms::CMSException);
+
+ }
+
+}}
+
+#endif /*ACTIVEMQ_CMSUTIL_MESSAGECREATOR_H*/
Added: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/ProducerCallback.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/ProducerCallback.h?rev=611484&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/ProducerCallback.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/ProducerCallback.h Sat Jan 12 13:16:29 2008
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CMSUTIL_PRODUCERCALLBACK_H
+#define ACTIVEMQ_CMSUTIL_PRODUCERCALLBACK_H
+
+namespace activemq {
+namespace cmsutil {
+
+ /**
+ * Callback for sending a message to a CMS destination.
+ */
+ class ProducernCallback {
+
+ virtual ~ProducernCallback();
+
+ /**
+ * Execute an action given a session and producer.
+ *
+ * @param session
+ * the CMS <code>Session</code>
+ * @param producer
+ * the CMS <code>Producer</code>
+ * @throws cms::CMSException if thrown by CMS API methods
+ */
+ virtual void doInCms(cms::Session* session,
+ cms::MessageProducer* producer) throw (cms::CMSException) = 0;
+
+ }
+
+}}
+
+#endif /*ACTIVEMQ_CMSUTIL_PRODUCERCALLBACK_H*/