You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/06 05:18:03 UTC
svn commit: r419422 [2/11] - in
/incubator/activemq/trunk/openwire-cpp/src/main/cpp: activemq/
activemq/command/ activemq/protocol/openwire/ activemq/transport/
activemq/transport/tcp/ cms/ ppr/ ppr/io/ ppr/io/encoding/ ppr/net/
ppr/thread/ ppr/util/
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp?rev=419422&r1=419421&r2=419422&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp Wed Jul 5 20:17:58 2006
@@ -1,514 +1,514 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "activemq/Session.hpp"
-#include "activemq/command/ActiveMQDestination.hpp"
-#include "activemq/command/ActiveMQQueue.hpp"
-#include "activemq/command/ActiveMQTopic.hpp"
-#include "activemq/command/ActiveMQTempQueue.hpp"
-#include "activemq/command/ActiveMQTempTopic.hpp"
-#include "activemq/command/ActiveMQMessage.hpp"
-#include "activemq/command/ActiveMQBytesMessage.hpp"
-#include "activemq/command/ActiveMQMapMessage.hpp"
-#include "activemq/command/ActiveMQTextMessage.hpp"
-#include "activemq/command/ProducerInfo.hpp"
-#include "activemq/command/ConsumerInfo.hpp"
-#include "activemq/command/MessageAck.hpp"
-#include "activemq/MessageConsumer.hpp"
-#include "activemq/MessageProducer.hpp"
-#include "activemq/Connection.hpp"
-
-using namespace apache::activemq;
-
-
-// Constructors -----------------------------------------------------
-
-/*
- *
- */
-Session::Session(p<Connection> connection, p<SessionInfo> info, AcknowledgementMode ackMode)
-{
- this->connection = connection ;
- this->sessionInfo = info ;
- this->ackMode = ackMode ;
- this->prefetchSize = 1000 ;
- this->consumerCounter = 0 ;
- this->producerCounter = 0 ;
- this->transactionContext = new TransactionContext(smartify(this)) ;
- this->dispatchThread = new DispatchThread(smartify(this)) ;
- this->closed = false ;
-
- // Activate backround dispatch thread
- dispatchThread->start() ;
-}
-
-/*
- *
- */
-Session::~Session()
-{
- // Make sure session is closed
- close() ;
-}
-
-
-// Attribute methods ------------------------------------------------
-
-/*
- *
- */
-bool Session::isTransacted()
-{
- return ( ackMode == TransactionalAckMode ) ? true : false ;
-}
-
-/*
- *
- */
-p<Connection> Session::getConnection()
-{
- return connection ;
-}
-
-/*
- *
- */
-p<SessionId> Session::getSessionId()
-{
- return sessionInfo->getSessionId() ;
-}
-
-/*
- *
- */
-p<TransactionContext> Session::getTransactionContext()
-{
- return transactionContext ;
-}
-
-/*
- *
- */
-p<MessageConsumer> Session::getConsumer(p<ConsumerId> consumerId)
-{
- map<long long, p<MessageConsumer> >::iterator tempIter ;
-
- // Check if key exists in map
- tempIter = consumers.find( consumerId->getValue() ) ;
- if( tempIter == consumers.end() )
- return NULL ;
- else
- return tempIter->second ;
-}
-
-/*
- *
- */
-p<MessageProducer> Session::getProducer(p<ProducerId> producerId)
-{
- map<long long, p<MessageProducer> >::iterator tempIter ;
-
- // Check if key exists in map
- tempIter = producers.find( producerId->getValue() ) ;
- if( tempIter == producers.end() )
- return NULL ;
- else
- return tempIter->second ;
-}
-
-
-// Operation methods ------------------------------------------------
-
-/*
- *
- */
-p<IMessageProducer> Session::createProducer()
-{
- return createProducer(NULL) ;
-}
-
-/*
- *
- */
-p<IMessageProducer> Session::createProducer(p<IDestination> destination)
-{
- p<ProducerInfo> command = createProducerInfo(destination) ;
- p<ProducerId> producerId = command->getProducerId() ;
-
- try
- {
- p<MessageProducer> producer = new MessageProducer(smartify(this), command) ;
-
- // Save the producer
- producers[ producerId->getValue() ] = producer ;
-
- // Register producer with broker
- connection->syncRequest(command) ;
-
- return producer ;
- }
- catch( exception e )
- {
- // Make sure producer was removed
- producers[ producerId->getValue() ] = NULL ;
- throw e ;
- }
-}
-
-/*
- *
- */
-p<IMessageConsumer> Session::createConsumer(p<IDestination> destination)
-{
- return createConsumer(destination, NULL) ;
-}
-
-/*
- *
- */
-p<IMessageConsumer> Session::createConsumer(p<IDestination> destination, const char* selector)
-{
- p<ConsumerInfo> command = createConsumerInfo(destination, selector) ;
- p<ConsumerId> consumerId = command->getConsumerId() ;
-
- try
- {
- p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
-
- // Save the consumer first in case message dispatching starts immediately
- consumers[ consumerId->getValue() ] = consumer ;
-
- // Register consumer with broker
- connection->syncRequest(command) ;
-
- return consumer ;
- }
- catch( exception e )
- {
- // Make sure consumer was removed
- consumers[ consumerId->getValue() ] = NULL ;
- throw e ;
- }
-}
-
-p<IMessageConsumer> Session::createDurableConsumer(p<ITopic> destination, const char* name, const char* selector, bool noLocal)
-{
- p<ConsumerInfo> command = createConsumerInfo(destination, selector) ;
- p<ConsumerId> consumerId = command->getConsumerId() ;
- p<string> subscriptionName = new string(name) ;
-
- command->setSubcriptionName( subscriptionName ) ;
- command->setNoLocal( noLocal ) ;
-
- try
- {
- p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
-
- // Save the consumer first in case message dispatching starts immediately
- consumers[ consumerId->getValue() ] = consumer ;
-
- // Register consumer with broker
- connection->syncRequest(command) ;
-
- return consumer ;
- }
- catch( exception e )
- {
- // Make sure consumer was removed
- consumers[ consumerId->getValue() ] = NULL ;
- throw e ;
- }
-}
-
-/*
- *
- */
-p<IQueue> Session::getQueue(const char* name)
-{
- p<IQueue> queue = new ActiveMQQueue(name) ;
- return queue ;
-}
-
-/*
- *
- */
-p<ITopic> Session::getTopic(const char* name)
-{
- p<ITopic> topic = new ActiveMQTopic(name) ;
- return topic ;
-}
-
-/*
- *
- */
-p<ITemporaryQueue> Session::createTemporaryQueue()
-{
- p<ITemporaryQueue> queue = new ActiveMQTempQueue( connection->createTemporaryDestinationName()->c_str() ) ;
- return queue ;
-}
-
-/*
- *
- */
-p<ITemporaryTopic> Session::createTemporaryTopic()
-{
- p<ITemporaryTopic> topic = new ActiveMQTempTopic( connection->createTemporaryDestinationName()->c_str() ) ;
- return topic ;
-}
-
-/*
- *
- */
-p<IMessage> Session::createMessage()
-{
- p<IMessage> message = new ActiveMQMessage() ;
- configure(message) ;
- return message ;
-}
-
-/*
- *
- */
-p<IBytesMessage> Session::createBytesMessage()
-{
- p<IBytesMessage> message = new ActiveMQBytesMessage() ;
- configure(message) ;
- return message ;
-}
-
-/*
- *
- */
-p<IBytesMessage> Session::createBytesMessage(char* body, int size)
-{
- p<IBytesMessage> message = new ActiveMQBytesMessage( body, size ) ;
- configure(message) ;
- return message ;
-}
-
-/*
- *
- */
-p<IMapMessage> Session::createMapMessage()
-{
- p<IMapMessage> message = new ActiveMQMapMessage() ;
- configure(message) ;
- return message ;
-}
-
-/*
- *
- */
-p<ITextMessage> Session::createTextMessage()
-{
- p<ITextMessage> message = new ActiveMQTextMessage() ;
- configure(message) ;
- return message ;
-}
-
-/*
- *
- */
-p<ITextMessage> Session::createTextMessage(const char* text)
-{
- p<ITextMessage> message = new ActiveMQTextMessage(text) ;
- configure(message) ;
- return message ;
-}
-
-/*
- *
- */
-void Session::commit() throw(CmsException)
-{
- if( !isTransacted() )
- throw CmsException("You cannot perform a commit on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
-
- transactionContext->commit() ;
-}
-
-/*
- *
- */
-void Session::rollback() throw(CmsException)
-{
- if( !isTransacted() )
- throw CmsException("You cannot perform a rollback on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
-
- transactionContext->rollback() ;
-
- map<long long, p<MessageConsumer> >::const_iterator tempIter ;
-
- // Ensure all the consumers redeliver any rolled back messages
- for( tempIter = consumers.begin() ;
- tempIter != consumers.end() ;
- tempIter++ )
- {
- ((*tempIter).second)->redeliverRolledBackMessages() ;
- }
-}
-
-/*
- *
- */
-void Session::doSend(p<IDestination> destination, p<IMessage> message)
-{
- p<ActiveMQMessage> command = p_dyncast<ActiveMQMessage> (message) ;
- // TODO complete packet
- connection->syncRequest(command) ;
-}
-
-/*
- * Starts a new transaction
- */
-void Session::doStartTransaction()
-{
- if( isTransacted() )
- transactionContext->begin() ;
-}
-
-/*
- *
- */
-void Session::dispatch(int delay)
-{
- if( delay > 0 )
- dispatchThread->sleep(delay) ;
-
- dispatchThread->wakeup() ;
-}
-
-/*
- *
- */
-void Session::dispatchAsyncMessages()
-{
- // Ensure that only 1 thread dispatches messages in a consumer at once
- LOCKED_SCOPE (mutex);
-
- map<long long, p<MessageConsumer> >::const_iterator tempIter ;
-
- // Iterate through each consumer created by this session
- // ensuring that they have all pending messages dispatched
- for( tempIter = consumers.begin() ;
- tempIter != consumers.end() ;
- tempIter++ )
- {
- ((*tempIter).second)->dispatchAsyncMessages() ;
- }
-}
-
-/*
- *
- */
-void Session::close()
-{
- if( !closed )
- {
- map<long long, p<MessageConsumer> >::iterator consumerIter ;
- map<long long, p<MessageProducer> >::iterator producerIter ;
-
- // Shutdown dispatch thread
- dispatchThread->interrupt() ;
- dispatchThread->join() ;
- dispatchThread = NULL ;
-
- // Iterate through all consumers and close them down
- for( consumerIter = consumers.begin() ;
- consumerIter != consumers.end() ;
- consumerIter++ )
- {
- consumerIter->second->close() ;
- consumerIter->second = NULL ;
- }
-
- // Iterate through all producers and close them down
- for( producerIter = producers.begin() ;
- producerIter != producers.end() ;
- producerIter++ )
- {
- producerIter->second->close() ;
- producerIter->second = NULL ;
- }
- // De-register session from broker/connection
- connection->disposeOf( sessionInfo->getSessionId() ) ;
-
- // Clean up
- connection = NULL ;
- closed = true ;
-
- }
-}
-
-
-// Implementation methods ------------------------------------------
-
-/*
- *
- */
-p<ConsumerInfo> Session::createConsumerInfo(p<IDestination> destination, const char* selector)
-{
- p<ConsumerInfo> consumerInfo = new ConsumerInfo() ;
- p<ConsumerId> consumerId = new ConsumerId() ;
-
- consumerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
- consumerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
-
- {
- LOCKED_SCOPE (mutex);
- consumerId->setValue( ++consumerCounter ) ;
- }
- p<string> sel = ( selector == NULL ) ? NULL : new string(selector) ;
-
- // TODO complete packet
- consumerInfo->setConsumerId( consumerId ) ;
- consumerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
- consumerInfo->setSelector( sel ) ;
- consumerInfo->setPrefetchSize( this->prefetchSize ) ;
-
- return consumerInfo ;
-}
-
-/*
- *
- */
-p<ProducerInfo> Session::createProducerInfo(p<IDestination> destination)
-{
- p<ProducerInfo> producerInfo = new ProducerInfo() ;
- p<ProducerId> producerId = new ProducerId() ;
-
- producerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
- producerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
-
- {
- LOCKED_SCOPE (mutex);
- producerId->setValue( ++producerCounter ) ;
- }
-
- // TODO complete packet
- producerInfo->setProducerId( producerId ) ;
- producerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
-
- return producerInfo ;
-}
-
-/*
- * Configures the message command.
- */
-void Session::configure(p<IMessage> message)
-{
- // TODO:
-}
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/Session.hpp"
+#include "activemq/command/ActiveMQDestination.hpp"
+#include "activemq/command/ActiveMQQueue.hpp"
+#include "activemq/command/ActiveMQTopic.hpp"
+#include "activemq/command/ActiveMQTempQueue.hpp"
+#include "activemq/command/ActiveMQTempTopic.hpp"
+#include "activemq/command/ActiveMQMessage.hpp"
+#include "activemq/command/ActiveMQBytesMessage.hpp"
+#include "activemq/command/ActiveMQMapMessage.hpp"
+#include "activemq/command/ActiveMQTextMessage.hpp"
+#include "activemq/command/ProducerInfo.hpp"
+#include "activemq/command/ConsumerInfo.hpp"
+#include "activemq/command/MessageAck.hpp"
+#include "activemq/MessageConsumer.hpp"
+#include "activemq/MessageProducer.hpp"
+#include "activemq/Connection.hpp"
+
+using namespace apache::activemq;
+
+
+// Constructors -----------------------------------------------------
+
+/*
+ *
+ */
+Session::Session(p<Connection> connection, p<SessionInfo> info, AcknowledgementMode ackMode)
+{
+ this->connection = connection ;
+ this->sessionInfo = info ;
+ this->ackMode = ackMode ;
+ this->prefetchSize = 1000 ;
+ this->consumerCounter = 0 ;
+ this->producerCounter = 0 ;
+ this->transactionContext = new TransactionContext(smartify(this)) ;
+ this->dispatchThread = new DispatchThread(smartify(this)) ;
+ this->closed = false ;
+
+ // Activate backround dispatch thread
+ dispatchThread->start() ;
+}
+
+/*
+ *
+ */
+Session::~Session()
+{
+ // Make sure session is closed
+ close() ;
+}
+
+
+// Attribute methods ------------------------------------------------
+
+/*
+ *
+ */
+bool Session::isTransacted()
+{
+ return ( ackMode == TransactionalAckMode ) ? true : false ;
+}
+
+/*
+ *
+ */
+p<Connection> Session::getConnection()
+{
+ return connection ;
+}
+
+/*
+ *
+ */
+p<SessionId> Session::getSessionId()
+{
+ return sessionInfo->getSessionId() ;
+}
+
+/*
+ *
+ */
+p<TransactionContext> Session::getTransactionContext()
+{
+ return transactionContext ;
+}
+
+/*
+ *
+ */
+p<MessageConsumer> Session::getConsumer(p<ConsumerId> consumerId)
+{
+ map<long long, p<MessageConsumer> >::iterator tempIter ;
+
+ // Check if key exists in map
+ tempIter = consumers.find( consumerId->getValue() ) ;
+ if( tempIter == consumers.end() )
+ return NULL ;
+ else
+ return tempIter->second ;
+}
+
+/*
+ *
+ */
+p<MessageProducer> Session::getProducer(p<ProducerId> producerId)
+{
+ map<long long, p<MessageProducer> >::iterator tempIter ;
+
+ // Check if key exists in map
+ tempIter = producers.find( producerId->getValue() ) ;
+ if( tempIter == producers.end() )
+ return NULL ;
+ else
+ return tempIter->second ;
+}
+
+
+// Operation methods ------------------------------------------------
+
+/*
+ *
+ */
+p<IMessageProducer> Session::createProducer()
+{
+ return createProducer(NULL) ;
+}
+
+/*
+ *
+ */
+p<IMessageProducer> Session::createProducer(p<IDestination> destination)
+{
+ p<ProducerInfo> command = createProducerInfo(destination) ;
+ p<ProducerId> producerId = command->getProducerId() ;
+
+ try
+ {
+ p<MessageProducer> producer = new MessageProducer(smartify(this), command) ;
+
+ // Save the producer
+ producers[ producerId->getValue() ] = producer ;
+
+ // Register producer with broker
+ connection->syncRequest(command) ;
+
+ return producer ;
+ }
+ catch( exception e )
+ {
+ // Make sure producer was removed
+ producers[ producerId->getValue() ] = NULL ;
+ throw e ;
+ }
+}
+
+/*
+ *
+ */
+p<IMessageConsumer> Session::createConsumer(p<IDestination> destination)
+{
+ return createConsumer(destination, NULL) ;
+}
+
+/*
+ *
+ */
+p<IMessageConsumer> Session::createConsumer(p<IDestination> destination, const char* selector)
+{
+ p<ConsumerInfo> command = createConsumerInfo(destination, selector) ;
+ p<ConsumerId> consumerId = command->getConsumerId() ;
+
+ try
+ {
+ p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
+
+ // Save the consumer first in case message dispatching starts immediately
+ consumers[ consumerId->getValue() ] = consumer ;
+
+ // Register consumer with broker
+ connection->syncRequest(command) ;
+
+ return consumer ;
+ }
+ catch( exception e )
+ {
+ // Make sure consumer was removed
+ consumers[ consumerId->getValue() ] = NULL ;
+ throw e ;
+ }
+}
+
+p<IMessageConsumer> Session::createDurableConsumer(p<ITopic> destination, const char* name, const char* selector, bool noLocal)
+{
+ p<ConsumerInfo> command = createConsumerInfo(destination, selector) ;
+ p<ConsumerId> consumerId = command->getConsumerId() ;
+ p<string> subscriptionName = new string(name) ;
+
+ command->setSubcriptionName( subscriptionName ) ;
+ command->setNoLocal( noLocal ) ;
+
+ try
+ {
+ p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
+
+ // Save the consumer first in case message dispatching starts immediately
+ consumers[ consumerId->getValue() ] = consumer ;
+
+ // Register consumer with broker
+ connection->syncRequest(command) ;
+
+ return consumer ;
+ }
+ catch( exception e )
+ {
+ // Make sure consumer was removed
+ consumers[ consumerId->getValue() ] = NULL ;
+ throw e ;
+ }
+}
+
+/*
+ *
+ */
+p<IQueue> Session::getQueue(const char* name)
+{
+ p<IQueue> queue = new ActiveMQQueue(name) ;
+ return queue ;
+}
+
+/*
+ *
+ */
+p<ITopic> Session::getTopic(const char* name)
+{
+ p<ITopic> topic = new ActiveMQTopic(name) ;
+ return topic ;
+}
+
+/*
+ *
+ */
+p<ITemporaryQueue> Session::createTemporaryQueue()
+{
+ p<ITemporaryQueue> queue = new ActiveMQTempQueue( connection->createTemporaryDestinationName()->c_str() ) ;
+ return queue ;
+}
+
+/*
+ *
+ */
+p<ITemporaryTopic> Session::createTemporaryTopic()
+{
+ p<ITemporaryTopic> topic = new ActiveMQTempTopic( connection->createTemporaryDestinationName()->c_str() ) ;
+ return topic ;
+}
+
+/*
+ *
+ */
+p<IMessage> Session::createMessage()
+{
+ p<IMessage> message = new ActiveMQMessage() ;
+ configure(message) ;
+ return message ;
+}
+
+/*
+ *
+ */
+p<IBytesMessage> Session::createBytesMessage()
+{
+ p<IBytesMessage> message = new ActiveMQBytesMessage() ;
+ configure(message) ;
+ return message ;
+}
+
+/*
+ *
+ */
+p<IBytesMessage> Session::createBytesMessage(char* body, int size)
+{
+ p<IBytesMessage> message = new ActiveMQBytesMessage( body, size ) ;
+ configure(message) ;
+ return message ;
+}
+
+/*
+ *
+ */
+p<IMapMessage> Session::createMapMessage()
+{
+ p<IMapMessage> message = new ActiveMQMapMessage() ;
+ configure(message) ;
+ return message ;
+}
+
+/*
+ *
+ */
+p<ITextMessage> Session::createTextMessage()
+{
+ p<ITextMessage> message = new ActiveMQTextMessage() ;
+ configure(message) ;
+ return message ;
+}
+
+/*
+ *
+ */
+p<ITextMessage> Session::createTextMessage(const char* text)
+{
+ p<ITextMessage> message = new ActiveMQTextMessage(text) ;
+ configure(message) ;
+ return message ;
+}
+
+/*
+ *
+ */
+void Session::commit() throw(CmsException)
+{
+ if( !isTransacted() )
+ throw CmsException("You cannot perform a commit on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
+
+ transactionContext->commit() ;
+}
+
+/*
+ *
+ */
+void Session::rollback() throw(CmsException)
+{
+ if( !isTransacted() )
+ throw CmsException("You cannot perform a rollback on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
+
+ transactionContext->rollback() ;
+
+ map<long long, p<MessageConsumer> >::const_iterator tempIter ;
+
+ // Ensure all the consumers redeliver any rolled back messages
+ for( tempIter = consumers.begin() ;
+ tempIter != consumers.end() ;
+ tempIter++ )
+ {
+ ((*tempIter).second)->redeliverRolledBackMessages() ;
+ }
+}
+
+/*
+ *
+ */
+void Session::doSend(p<IDestination> destination, p<IMessage> message)
+{
+ p<ActiveMQMessage> command = p_dyncast<ActiveMQMessage> (message) ;
+ // TODO complete packet
+ connection->syncRequest(command) ;
+}
+
+/*
+ * Starts a new transaction
+ */
+void Session::doStartTransaction()
+{
+ if( isTransacted() )
+ transactionContext->begin() ;
+}
+
+/*
+ *
+ */
+void Session::dispatch(int delay)
+{
+ if( delay > 0 )
+ dispatchThread->sleep(delay) ;
+
+ dispatchThread->wakeup() ;
+}
+
+/*
+ *
+ */
+void Session::dispatchAsyncMessages()
+{
+ // Ensure that only 1 thread dispatches messages in a consumer at once
+ LOCKED_SCOPE (mutex);
+
+ map<long long, p<MessageConsumer> >::const_iterator tempIter ;
+
+ // Iterate through each consumer created by this session
+ // ensuring that they have all pending messages dispatched
+ for( tempIter = consumers.begin() ;
+ tempIter != consumers.end() ;
+ tempIter++ )
+ {
+ ((*tempIter).second)->dispatchAsyncMessages() ;
+ }
+}
+
+/*
+ *
+ */
+void Session::close()
+{
+ if( !closed )
+ {
+ map<long long, p<MessageConsumer> >::iterator consumerIter ;
+ map<long long, p<MessageProducer> >::iterator producerIter ;
+
+ // Shutdown dispatch thread
+ dispatchThread->interrupt() ;
+ dispatchThread->join() ;
+ dispatchThread = NULL ;
+
+ // Iterate through all consumers and close them down
+ for( consumerIter = consumers.begin() ;
+ consumerIter != consumers.end() ;
+ consumerIter++ )
+ {
+ consumerIter->second->close() ;
+ consumerIter->second = NULL ;
+ }
+
+ // Iterate through all producers and close them down
+ for( producerIter = producers.begin() ;
+ producerIter != producers.end() ;
+ producerIter++ )
+ {
+ producerIter->second->close() ;
+ producerIter->second = NULL ;
+ }
+ // De-register session from broker/connection
+ connection->disposeOf( sessionInfo->getSessionId() ) ;
+
+ // Clean up
+ connection = NULL ;
+ closed = true ;
+
+ }
+}
+
+
+// Implementation methods ------------------------------------------
+
+/*
+ *
+ */
+p<ConsumerInfo> Session::createConsumerInfo(p<IDestination> destination, const char* selector)
+{
+ p<ConsumerInfo> consumerInfo = new ConsumerInfo() ;
+ p<ConsumerId> consumerId = new ConsumerId() ;
+
+ consumerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
+ consumerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
+
+ {
+ LOCKED_SCOPE (mutex);
+ consumerId->setValue( ++consumerCounter ) ;
+ }
+ p<string> sel = ( selector == NULL ) ? NULL : new string(selector) ;
+
+ // TODO complete packet
+ consumerInfo->setConsumerId( consumerId ) ;
+ consumerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
+ consumerInfo->setSelector( sel ) ;
+ consumerInfo->setPrefetchSize( this->prefetchSize ) ;
+
+ return consumerInfo ;
+}
+
+/*
+ *
+ */
+p<ProducerInfo> Session::createProducerInfo(p<IDestination> destination)
+{
+ p<ProducerInfo> producerInfo = new ProducerInfo() ;
+ p<ProducerId> producerId = new ProducerId() ;
+
+ producerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
+ producerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
+
+ {
+ LOCKED_SCOPE (mutex);
+ producerId->setValue( ++producerCounter ) ;
+ }
+
+ // TODO complete packet
+ producerInfo->setProducerId( producerId ) ;
+ producerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
+
+ return producerInfo ;
+}
+
+/*
+ * Configures the message command.
+ */
+void Session::configure(p<IMessage> message)
+{
+ // TODO:
+}
Propchange: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/TransactionContext.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/TransactionContext.cpp?rev=419422&r1=419421&r2=419422&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/TransactionContext.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/TransactionContext.cpp Wed Jul 5 20:17:58 2006
@@ -1,149 +1,149 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "activemq/TransactionContext.hpp"
-#include "activemq/Session.hpp"
-
-using namespace apache::activemq;
-
-/*
- *
- */
-TransactionContext::TransactionContext(p<Session> session)
-{
- this->session = session ;
- this->transactionId = NULL ;
-}
-
-/*
- *
- */
-TransactionContext::~TransactionContext()
-{
- // no-op
-}
-
-/*
- *
- */
-p<TransactionId> TransactionContext::getTransactionId()
-{
- return transactionId ;
-}
-
-/*
- *
- */
-void TransactionContext::addSynchronization(p<ISynchronization> synchronization)
-{
- synchronizations.push_back(synchronization) ;
-}
-
-/*
- *
- */
-void TransactionContext::begin()
-{
- if( transactionId == NULL )
- {
- // Create a new local transaction id
- transactionId = session->getConnection()->createLocalTransactionId() ;
-
- // Create a new transaction command
- p<TransactionInfo> info = new TransactionInfo() ;
- info->setConnectionId( session->getConnection()->getConnectionId() ) ;
- info->setTransactionId( transactionId ) ;
- info->setType( BeginTx ) ;
-
- // Send begin command to broker
- session->getConnection()->oneway(info) ;
- }
-}
-
-/*
- *
- */
-void TransactionContext::commit()
-{
- list< p<ISynchronization> >::const_iterator tempIter ;
-
- // Iterate through each synchronization and call beforeCommit()
- for( tempIter = synchronizations.begin() ;
- tempIter != synchronizations.end() ;
- tempIter++ )
- {
- (*tempIter)->beforeCommit() ;
- }
-
- if( transactionId != NULL )
- {
- // Create a new transaction command
- p<TransactionInfo> info = new TransactionInfo() ;
- info->setConnectionId( session->getConnection()->getConnectionId() ) ;
- info->setTransactionId( transactionId ) ;
- info->setType( CommitOnePhaseTx ) ;
-
- // Reset transaction
- transactionId = NULL ;
-
- // Send commit command to broker
- session->getConnection()->oneway(info) ;
- }
-
- // Iterate through each synchronization and call afterCommit()
- for( tempIter = synchronizations.begin() ;
- tempIter != synchronizations.end() ;
- tempIter++ )
- {
- (*tempIter)->afterCommit() ;
- }
-
- // Clear all syncronizations
- synchronizations.clear() ;
-}
-
-/*
- *
- */
-void TransactionContext::rollback()
-{
- if( transactionId != NULL )
- {
- // Create a new transaction command
- p<TransactionInfo> info = new TransactionInfo() ;
- info->setConnectionId( session->getConnection()->getConnectionId() ) ;
- info->setTransactionId( transactionId ) ;
- info->setType( RollbackTx ) ;
-
- // Reset transaction
- transactionId = NULL ;
-
- // Send rollback command to broker
- session->getConnection()->oneway(info) ;
- }
-
- list< p<ISynchronization> >::const_iterator tempIter ;
-
- // Iterate through each synchronization and call afterRollback()
- for( tempIter = synchronizations.begin() ;
- tempIter != synchronizations.end() ;
- tempIter++ )
- {
- (*tempIter)->afterRollback() ;
- }
- // Clear all syncronizations
- synchronizations.clear() ;
-}
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/TransactionContext.hpp"
+#include "activemq/Session.hpp"
+
+using namespace apache::activemq;
+
+/*
+ *
+ */
+TransactionContext::TransactionContext(p<Session> session)
+{
+ this->session = session ;
+ this->transactionId = NULL ;
+}
+
+/*
+ *
+ */
+TransactionContext::~TransactionContext()
+{
+ // no-op
+}
+
+/*
+ *
+ */
+p<TransactionId> TransactionContext::getTransactionId()
+{
+ return transactionId ;
+}
+
+/*
+ *
+ */
+void TransactionContext::addSynchronization(p<ISynchronization> synchronization)
+{
+ synchronizations.push_back(synchronization) ;
+}
+
+/*
+ *
+ */
+void TransactionContext::begin()
+{
+ if( transactionId == NULL )
+ {
+ // Create a new local transaction id
+ transactionId = session->getConnection()->createLocalTransactionId() ;
+
+ // Create a new transaction command
+ p<TransactionInfo> info = new TransactionInfo() ;
+ info->setConnectionId( session->getConnection()->getConnectionId() ) ;
+ info->setTransactionId( transactionId ) ;
+ info->setType( BeginTx ) ;
+
+ // Send begin command to broker
+ session->getConnection()->oneway(info) ;
+ }
+}
+
+/*
+ *
+ */
+void TransactionContext::commit()
+{
+ list< p<ISynchronization> >::const_iterator tempIter ;
+
+ // Iterate through each synchronization and call beforeCommit()
+ for( tempIter = synchronizations.begin() ;
+ tempIter != synchronizations.end() ;
+ tempIter++ )
+ {
+ (*tempIter)->beforeCommit() ;
+ }
+
+ if( transactionId != NULL )
+ {
+ // Create a new transaction command
+ p<TransactionInfo> info = new TransactionInfo() ;
+ info->setConnectionId( session->getConnection()->getConnectionId() ) ;
+ info->setTransactionId( transactionId ) ;
+ info->setType( CommitOnePhaseTx ) ;
+
+ // Reset transaction
+ transactionId = NULL ;
+
+ // Send commit command to broker
+ session->getConnection()->oneway(info) ;
+ }
+
+ // Iterate through each synchronization and call afterCommit()
+ for( tempIter = synchronizations.begin() ;
+ tempIter != synchronizations.end() ;
+ tempIter++ )
+ {
+ (*tempIter)->afterCommit() ;
+ }
+
+ // Clear all syncronizations
+ synchronizations.clear() ;
+}
+
+/*
+ *
+ */
+void TransactionContext::rollback()
+{
+ if( transactionId != NULL )
+ {
+ // Create a new transaction command
+ p<TransactionInfo> info = new TransactionInfo() ;
+ info->setConnectionId( session->getConnection()->getConnectionId() ) ;
+ info->setTransactionId( transactionId ) ;
+ info->setType( RollbackTx ) ;
+
+ // Reset transaction
+ transactionId = NULL ;
+
+ // Send rollback command to broker
+ session->getConnection()->oneway(info) ;
+ }
+
+ list< p<ISynchronization> >::const_iterator tempIter ;
+
+ // Iterate through each synchronization and call afterRollback()
+ for( tempIter = synchronizations.begin() ;
+ tempIter != synchronizations.end() ;
+ tempIter++ )
+ {
+ (*tempIter)->afterRollback() ;
+ }
+ // Clear all syncronizations
+ synchronizations.clear() ;
+}
Propchange: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/TransactionContext.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQBytesMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQBytesMessage.cpp?rev=419422&r1=419421&r2=419422&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQBytesMessage.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQBytesMessage.cpp Wed Jul 5 20:17:58 2006
@@ -1,391 +1,391 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "activemq/command/ActiveMQBytesMessage.hpp"
-
-using namespace apache::activemq::command;
-
-
-/*
- *
- */
-ActiveMQBytesMessage::ActiveMQBytesMessage()
-{
- this->bis = NULL ;
- this->bos = new ByteArrayOutputStream() ;
- this->dis = NULL ;
- this->dos = new DataOutputStream( bos ) ;
- this->readMode = false ;
-}
-
-/*
- *
- */
-ActiveMQBytesMessage::ActiveMQBytesMessage(char* body, int size)
-{
- // Convert body to SP array
- array<char> buffer = array<char> (size) ;
- memcpy(buffer.c_array(), body, size);
-
- this->bis = NULL ;
- this->bos = new ByteArrayOutputStream(buffer) ;
- this->dis = NULL ;
- this->dos = new DataOutputStream( bos ) ;
- this->readMode = false ;
-}
-
-/*
- *
- */
-ActiveMQBytesMessage::~ActiveMQBytesMessage()
-{
-}
-
-/*
- *
- */
-unsigned char ActiveMQBytesMessage::getDataStructureType()
-{
- return ActiveMQBytesMessage::TYPE ;
-}
-
-/*
- *
- */
-void ActiveMQBytesMessage::reset()
-{
- if( readMode )
- {
- this->bos = new ByteArrayOutputStream( bis->toArray() ) ;
- this->bis = NULL ;
- this->dos = new DataOutputStream( bos ) ;
- this->dis = NULL ;
- this->readMode = false ;
- }
- else
- {
- this->bis = new ByteArrayInputStream( bos->toArray() ) ;
- this->bos = NULL ;
- this->dis = new DataInputStream( bis ) ;
- this->dos = NULL ;
- this->readMode = true ;
- }
-}
-
-/*
- *
- */
-char ActiveMQBytesMessage::readByte() throw(MessageNotReadableException, MessageEOFException)
-{
- // Assert read mode
- if( !readMode )
- throw MessageNotReadableException() ;
-
- try
- {
- // Read a single byte
- return dis->readByte() ;
- }
- catch( EOFException eof )
- {
- throw MessageEOFException() ;
- }
-}
-
-/*
- *
- */
-int ActiveMQBytesMessage::readBytes(char* buffer, int offset, int length) throw (MessageNotReadableException, MessageEOFException)
-{
- // Assert read mode
- if( !readMode )
- throw MessageNotReadableException() ;
-
- try
- {
- // Read some bytes
- return dis->read(buffer, offset, length) ;
- }
- catch( EOFException eof )
- {
- throw MessageEOFException() ;
- }
-}
-
-/*
- *
- */
-bool ActiveMQBytesMessage::readBoolean() throw(MessageNotReadableException, MessageEOFException)
-{
- // Assert read mode
- if( !readMode )
- throw MessageNotReadableException() ;
-
- try
- {
- // Read a boolean
- return dis->readBoolean() ;
- }
- catch( EOFException eof )
- {
- throw MessageEOFException() ;
- }
-}
-
-/*
- *
- */
-double ActiveMQBytesMessage::readDouble() throw(MessageNotReadableException, MessageEOFException)
-{
- // Assert read mode
- if( !readMode )
- throw MessageNotReadableException() ;
-
- try
- {
- // Read a double
- return dis->readDouble() ;
- }
- catch( EOFException eof )
- {
- throw MessageEOFException() ;
- }
-}
-
-/*
- *
- */
-float ActiveMQBytesMessage::readFloat() throw(MessageNotReadableException, MessageEOFException)
-{
- // Assert read mode
- if( !readMode )
- throw MessageNotReadableException() ;
-
- try
- {
- // Read a float
- return dis->readFloat() ;
- }
- catch( EOFException eof )
- {
- throw MessageEOFException() ;
- }
-}
-
-/*
- *
- */
-short ActiveMQBytesMessage::readShort() throw(MessageNotReadableException, MessageEOFException)
-{
- // Assert read mode
- if( !readMode )
- throw MessageNotReadableException() ;
-
- try
- {
- // Read a short
- return dis->readShort() ;
- }
- catch( EOFException eof )
- {
- throw MessageEOFException() ;
- }
-}
-
-/*
- *
- */
-int ActiveMQBytesMessage::readInt() throw(MessageNotReadableException, MessageEOFException)
-{
- // Assert read mode
- if( !readMode )
- throw MessageNotReadableException() ;
-
- try
- {
- // Read an integer
- return dis->readInt() ;
- }
- catch( EOFException eof )
- {
- throw MessageEOFException() ;
- }
-}
-
-/*
- *
- */
-long long ActiveMQBytesMessage::readLong() throw(MessageNotReadableException, MessageEOFException)
-{
- // Assert read mode
- if( !readMode )
- throw MessageNotReadableException() ;
-
- try
- {
- // Read a long long
- return dis->readLong() ;
- }
- catch( EOFException eof )
- {
- throw MessageEOFException() ;
- }
-}
-
-/*
- *
- */
-p<string> ActiveMQBytesMessage::readString() throw(MessageNotReadableException, MessageEOFException)
-{
- // Assert read mode
- if( !readMode )
- throw MessageNotReadableException() ;
-
- try
- {
- // Read a string
- return dis->readString() ;
- }
- catch( EOFException eof )
- {
- throw MessageEOFException() ;
- }
-}
-
-/*
- *
- */
-void ActiveMQBytesMessage::writeByte(char value) throw (MessageNotWritableException)
-{
- // Assert write mode
- if( readMode )
- throw MessageNotWritableException() ;
-
- // Write a single byte
- dos->writeByte(value) ;
-}
-
-/*
- *
- */
-void ActiveMQBytesMessage::writeBytes(char* value, int offset, int length) throw (MessageNotWritableException)
-{
- // Assert write mode
- if( readMode )
- throw MessageNotWritableException() ;
-
- // Write some bytes
- dos->write(value, offset, length) ;
-}
-
-/*
- *
- */
-void ActiveMQBytesMessage::writeBoolean(bool value) throw (MessageNotWritableException)
-{
- // Assert write mode
- if( readMode )
- throw MessageNotWritableException() ;
-
- // Write a boolean
- dos->writeBoolean(value) ;
-}
-
-/*
- *
- */
-void ActiveMQBytesMessage::writeDouble(double value) throw (MessageNotWritableException)
-{
- // Assert write mode
- if( readMode )
- throw MessageNotWritableException() ;
-
- // Write a double
- dos->writeDouble(value) ;
-}
-
-/*
- *
- */
-void ActiveMQBytesMessage::writeFloat(float value) throw (MessageNotWritableException)
-{
- // Assert write mode
- if( readMode )
- throw MessageNotWritableException() ;
-
- // Write a float
- dos->writeFloat(value) ;
-}
-
-/*
- *
- */
-void ActiveMQBytesMessage::writeInt(int value) throw (MessageNotWritableException)
-{
- // Assert write mode
- if( readMode )
- throw MessageNotWritableException() ;
-
- // Write an integer
- dos->writeInt(value) ;
-}
-
-/*
- *
- */
-void ActiveMQBytesMessage::writeLong(long long value) throw (MessageNotWritableException)
-{
- // Assert write mode
- if( readMode )
- throw MessageNotWritableException() ;
-
- // Write a long long
- dos->writeLong(value) ;
-}
-
-/*
- *
- */
-void ActiveMQBytesMessage::writeShort(short value) throw (MessageNotWritableException)
-{
- // Assert write mode
- if( readMode )
- throw MessageNotWritableException() ;
-
- // Write a short
- dos->writeShort(value) ;
-}
-
-/*
- *
- */
-void ActiveMQBytesMessage::writeString(const char* value) throw (MessageNotWritableException)
-{
- // Assert write mode
- if( readMode )
- throw MessageNotWritableException() ;
-
- // Write a string
- p<string> v = new string(value) ;
- dos->writeString(v) ;
-}
-
-/*
- *
- */
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/command/ActiveMQBytesMessage.hpp"
+
+using namespace apache::activemq::command;
+
+
+/*
+ *
+ */
+ActiveMQBytesMessage::ActiveMQBytesMessage()
+{
+ this->bis = NULL ;
+ this->bos = new ByteArrayOutputStream() ;
+ this->dis = NULL ;
+ this->dos = new DataOutputStream( bos ) ;
+ this->readMode = false ;
+}
+
+/*
+ *
+ */
+ActiveMQBytesMessage::ActiveMQBytesMessage(char* body, int size)
+{
+ // Convert body to SP array
+ array<char> buffer = array<char> (size) ;
+ memcpy(buffer.c_array(), body, size);
+
+ this->bis = NULL ;
+ this->bos = new ByteArrayOutputStream(buffer) ;
+ this->dis = NULL ;
+ this->dos = new DataOutputStream( bos ) ;
+ this->readMode = false ;
+}
+
+/*
+ *
+ */
+ActiveMQBytesMessage::~ActiveMQBytesMessage()
+{
+}
+
+/*
+ *
+ */
+unsigned char ActiveMQBytesMessage::getDataStructureType()
+{
+ return ActiveMQBytesMessage::TYPE ;
+}
+
+/*
+ *
+ */
+void ActiveMQBytesMessage::reset()
+{
+ if( readMode )
+ {
+ this->bos = new ByteArrayOutputStream( bis->toArray() ) ;
+ this->bis = NULL ;
+ this->dos = new DataOutputStream( bos ) ;
+ this->dis = NULL ;
+ this->readMode = false ;
+ }
+ else
+ {
+ this->bis = new ByteArrayInputStream( bos->toArray() ) ;
+ this->bos = NULL ;
+ this->dis = new DataInputStream( bis ) ;
+ this->dos = NULL ;
+ this->readMode = true ;
+ }
+}
+
+/*
+ *
+ */
+char ActiveMQBytesMessage::readByte() throw(MessageNotReadableException, MessageEOFException)
+{
+ // Assert read mode
+ if( !readMode )
+ throw MessageNotReadableException() ;
+
+ try
+ {
+ // Read a single byte
+ return dis->readByte() ;
+ }
+ catch( EOFException eof )
+ {
+ throw MessageEOFException() ;
+ }
+}
+
+/*
+ *
+ */
+int ActiveMQBytesMessage::readBytes(char* buffer, int offset, int length) throw (MessageNotReadableException, MessageEOFException)
+{
+ // Assert read mode
+ if( !readMode )
+ throw MessageNotReadableException() ;
+
+ try
+ {
+ // Read some bytes
+ return dis->read(buffer, offset, length) ;
+ }
+ catch( EOFException eof )
+ {
+ throw MessageEOFException() ;
+ }
+}
+
+/*
+ *
+ */
+bool ActiveMQBytesMessage::readBoolean() throw(MessageNotReadableException, MessageEOFException)
+{
+ // Assert read mode
+ if( !readMode )
+ throw MessageNotReadableException() ;
+
+ try
+ {
+ // Read a boolean
+ return dis->readBoolean() ;
+ }
+ catch( EOFException eof )
+ {
+ throw MessageEOFException() ;
+ }
+}
+
+/*
+ *
+ */
+double ActiveMQBytesMessage::readDouble() throw(MessageNotReadableException, MessageEOFException)
+{
+ // Assert read mode
+ if( !readMode )
+ throw MessageNotReadableException() ;
+
+ try
+ {
+ // Read a double
+ return dis->readDouble() ;
+ }
+ catch( EOFException eof )
+ {
+ throw MessageEOFException() ;
+ }
+}
+
+/*
+ *
+ */
+float ActiveMQBytesMessage::readFloat() throw(MessageNotReadableException, MessageEOFException)
+{
+ // Assert read mode
+ if( !readMode )
+ throw MessageNotReadableException() ;
+
+ try
+ {
+ // Read a float
+ return dis->readFloat() ;
+ }
+ catch( EOFException eof )
+ {
+ throw MessageEOFException() ;
+ }
+}
+
+/*
+ *
+ */
+short ActiveMQBytesMessage::readShort() throw(MessageNotReadableException, MessageEOFException)
+{
+ // Assert read mode
+ if( !readMode )
+ throw MessageNotReadableException() ;
+
+ try
+ {
+ // Read a short
+ return dis->readShort() ;
+ }
+ catch( EOFException eof )
+ {
+ throw MessageEOFException() ;
+ }
+}
+
+/*
+ *
+ */
+int ActiveMQBytesMessage::readInt() throw(MessageNotReadableException, MessageEOFException)
+{
+ // Assert read mode
+ if( !readMode )
+ throw MessageNotReadableException() ;
+
+ try
+ {
+ // Read an integer
+ return dis->readInt() ;
+ }
+ catch( EOFException eof )
+ {
+ throw MessageEOFException() ;
+ }
+}
+
+/*
+ *
+ */
+long long ActiveMQBytesMessage::readLong() throw(MessageNotReadableException, MessageEOFException)
+{
+ // Assert read mode
+ if( !readMode )
+ throw MessageNotReadableException() ;
+
+ try
+ {
+ // Read a long long
+ return dis->readLong() ;
+ }
+ catch( EOFException eof )
+ {
+ throw MessageEOFException() ;
+ }
+}
+
+/*
+ *
+ */
+p<string> ActiveMQBytesMessage::readString() throw(MessageNotReadableException, MessageEOFException)
+{
+ // Assert read mode
+ if( !readMode )
+ throw MessageNotReadableException() ;
+
+ try
+ {
+ // Read a string
+ return dis->readString() ;
+ }
+ catch( EOFException eof )
+ {
+ throw MessageEOFException() ;
+ }
+}
+
+/*
+ *
+ */
+void ActiveMQBytesMessage::writeByte(char value) throw (MessageNotWritableException)
+{
+ // Assert write mode
+ if( readMode )
+ throw MessageNotWritableException() ;
+
+ // Write a single byte
+ dos->writeByte(value) ;
+}
+
+/*
+ *
+ */
+void ActiveMQBytesMessage::writeBytes(char* value, int offset, int length) throw (MessageNotWritableException)
+{
+ // Assert write mode
+ if( readMode )
+ throw MessageNotWritableException() ;
+
+ // Write some bytes
+ dos->write(value, offset, length) ;
+}
+
+/*
+ *
+ */
+void ActiveMQBytesMessage::writeBoolean(bool value) throw (MessageNotWritableException)
+{
+ // Assert write mode
+ if( readMode )
+ throw MessageNotWritableException() ;
+
+ // Write a boolean
+ dos->writeBoolean(value) ;
+}
+
+/*
+ *
+ */
+void ActiveMQBytesMessage::writeDouble(double value) throw (MessageNotWritableException)
+{
+ // Assert write mode
+ if( readMode )
+ throw MessageNotWritableException() ;
+
+ // Write a double
+ dos->writeDouble(value) ;
+}
+
+/*
+ *
+ */
+void ActiveMQBytesMessage::writeFloat(float value) throw (MessageNotWritableException)
+{
+ // Assert write mode
+ if( readMode )
+ throw MessageNotWritableException() ;
+
+ // Write a float
+ dos->writeFloat(value) ;
+}
+
+/*
+ *
+ */
+void ActiveMQBytesMessage::writeInt(int value) throw (MessageNotWritableException)
+{
+ // Assert write mode
+ if( readMode )
+ throw MessageNotWritableException() ;
+
+ // Write an integer
+ dos->writeInt(value) ;
+}
+
+/*
+ *
+ */
+void ActiveMQBytesMessage::writeLong(long long value) throw (MessageNotWritableException)
+{
+ // Assert write mode
+ if( readMode )
+ throw MessageNotWritableException() ;
+
+ // Write a long long
+ dos->writeLong(value) ;
+}
+
+/*
+ *
+ */
+void ActiveMQBytesMessage::writeShort(short value) throw (MessageNotWritableException)
+{
+ // Assert write mode
+ if( readMode )
+ throw MessageNotWritableException() ;
+
+ // Write a short
+ dos->writeShort(value) ;
+}
+
+/*
+ *
+ */
+void ActiveMQBytesMessage::writeString(const char* value) throw (MessageNotWritableException)
+{
+ // Assert write mode
+ if( readMode )
+ throw MessageNotWritableException() ;
+
+ // Write a string
+ p<string> v = new string(value) ;
+ dos->writeString(v) ;
+}
+
+/*
+ *
+ */
int ActiveMQBytesMessage::marshal(p<IMarshaller> marshaller, int mode, p<IOutputStream> ostream) throw (IOException)
{
int size = 0 ;
@@ -402,9 +402,9 @@
return size ;
}
-/*
- *
- */
+/*
+ *
+ */
void ActiveMQBytesMessage::unmarshal(p<IMarshaller> marshaller, int mode, p<IInputStream> istream) throw (IOException)
{
// Note! Message content unmarshalling is done in super class
Propchange: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQBytesMessage.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQDestination.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQDestination.cpp?rev=419422&r1=419421&r2=419422&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQDestination.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQDestination.cpp Wed Jul 5 20:17:58 2006
@@ -1,352 +1,352 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "activemq/DestinationFilter.hpp"
-#include "activemq/command/ActiveMQDestination.hpp"
-#include "activemq/command/ActiveMQTempQueue.hpp"
-#include "activemq/command/ActiveMQTempTopic.hpp"
-#include "activemq/command/ActiveMQQueue.hpp"
-#include "activemq/command/ActiveMQTopic.hpp"
-
-using namespace apache::activemq::command;
-
-
-// --- Static initialization ----------------------------------------
-
-const char* ActiveMQDestination::ADVISORY_PREFIX = "ActiveMQ.Advisory." ;
-const char* ActiveMQDestination::CONSUMER_ADVISORY_PREFIX = "ActiveMQ.Advisory.Consumers." ;
-const char* ActiveMQDestination::PRODUCER_ADVISORY_PREFIX = "ActiveMQ.Advisory.Producers." ;
-const char* ActiveMQDestination::CONNECTION_ADVISORY_PREFIX = "ActiveMQ.Advisory.Connections." ;
-const char* ActiveMQDestination::DEFAULT_ORDERED_TARGET = "coordinator" ;
-
-const char* ActiveMQDestination::TEMP_PREFIX = "{TD{" ;
-const char* ActiveMQDestination::TEMP_POSTFIX = "}TD}" ;
-const char* ActiveMQDestination::COMPOSITE_SEPARATOR = "," ;
-const char* ActiveMQDestination::QUEUE_PREFIX = "queue://" ;
-const char* ActiveMQDestination::TOPIC_PREFIX = "topic://" ;
-
-
-/*
- * Default constructor
- */
-ActiveMQDestination::ActiveMQDestination()
-{
- orderedTarget = new string(DEFAULT_ORDERED_TARGET) ;
- physicalName = new string("") ;
- exclusive = false ;
- ordered = false ;
- advisory = false ;
-}
-
-/*
- * Constructs the destination with a specified physical name.
- *
- * @param name the physical name for the destination.
- */
-ActiveMQDestination::ActiveMQDestination(const char* name)
-{
- orderedTarget = new string(DEFAULT_ORDERED_TARGET) ;
- physicalName = new string(name) ;
- exclusive = false ;
- ordered = false ;
- advisory = ( name != NULL && (physicalName->find(ADVISORY_PREFIX) == 0)) ? true : false ;
-}
-
-/*
- *
- */
-ActiveMQDestination::~ActiveMQDestination()
-{
- // no-op
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isAdvisory()
-{
- return advisory ;
-}
-
-/*
- *
- */
-void ActiveMQDestination::setAdvisory(bool advisory)
-{
- this->advisory = advisory ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isConsumerAdvisory()
-{
- return ( isAdvisory() && (physicalName->find(CONSUMER_ADVISORY_PREFIX) == 0) ) ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isProducerAdvisory()
-{
- return ( isAdvisory() && (physicalName->find(PRODUCER_ADVISORY_PREFIX) == 0) ) ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isConnectionAdvisory()
-{
- return ( isAdvisory() && (physicalName->find(CONNECTION_ADVISORY_PREFIX) == 0) ) ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isExclusive()
-{
- return exclusive ;
-}
-
-/*
- *
- */
-void ActiveMQDestination::setExclusive(bool exclusive)
-{
- this->exclusive = exclusive ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isOrdered()
-{
- return ordered ;
-}
-
-/*
- *
- */
-void ActiveMQDestination::setOrdered(bool ordered)
-{
- this->ordered = ordered ;
-}
-
-/*
- *
- */
-p<string> ActiveMQDestination::getOrderedTarget()
-{
- return orderedTarget ;
-}
-
-/*
- *
- */
-void ActiveMQDestination::setOrderedTarget(const char* target)
-{
- this->orderedTarget->assign(target) ;
-}
-
-/*
- *
- */
-p<string> ActiveMQDestination::getPhysicalName()
-{
- return physicalName ;
-}
-
-/*
- *
- */
-void ActiveMQDestination::setPhysicalName(const char* name)
-{
- physicalName->assign(name) ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isTopic()
-{
- return ( getDestinationType() == ACTIVEMQ_TOPIC ||
- getDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC ) ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isQueue()
-{
- return !isTopic() ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isTemporary()
-{
- return ( getDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC ||
- getDestinationType() == ACTIVEMQ_TEMPORARY_QUEUE ) ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isComposite()
-{
- return ( physicalName->find(ActiveMQDestination::COMPOSITE_SEPARATOR) > 0 ) ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isWildcard()
-{
- if( physicalName != NULL )
- {
- return ( physicalName->find( DestinationFilter::ANY_CHILD ) >= 0 ||
- physicalName->find( DestinationFilter::ANY_DESCENDENT ) >= 0 ) ;
- }
- return false ;
-}
-
-/*
- *
- */
-p<string> ActiveMQDestination::toString()
-{
- return physicalName ;
-}
-
-// --- Static methods ---------------------------------------------
-
-/*
- * A helper method to return a descriptive string for the topic or queue.
- */
-p<string> ActiveMQDestination::inspect(p<ActiveMQDestination> destination)
-{
- p<string> description = new string() ;
-
- if( typeid(*destination) == typeid(ITopic) )
- description->assign("Topic(") ;
- else
- description->assign("Queue(") ;
-
- description->append( destination->toString()->c_str() ) ;
- description->append(")") ;
-
- return description ;
-}
-
-/*
- *
- */
-/*p<ActiveMQDestination> ActiveMQDestination::transform(p<IDestination> destination)
-{
- p<ActiveMQDestination> result = NULL ;
-
- if( destination != NULL )
- {
- if( typeid(*destination) == typeid(ActiveMQDestination) )
- result = p_cast<ActiveMQDestination> (destination) ;
-
- else
- {
- if( typeid(ITopic).before(typeid(IDestination)) )
- result = new ActiveMQTopic( (p_cast<ITopic> (destination))->getTopicName()->c_str() ) ;
-
- else if( typeid(*destination).before(typeid(IQueue)) )
- result = new ActiveMQQueue( (p_cast<IQueue> (destination))->getQueueName()->c_str() ) ;
-
- else if( typeid(ITemporaryQueue).before(typeid(*destination)) )
- result = new ActiveMQTempQueue( (p_cast<IQueue> (destination))->getQueueName()->c_str() ) ;
-
- else if( typeid(ITemporaryTopic).before(typeid(*destination)) )
- result = new ActiveMQTempTopic( (p_cast<ITopic> (destination))->getTopicName()->c_str() ) ;
-
- }
- }
- return result ;
-}*/
-
-/*
- *
- */
-p<ActiveMQDestination> ActiveMQDestination::createDestination(int type, const char* physicalName)
-{
- p<ActiveMQDestination> result = NULL ;
-
- if( type == ActiveMQDestination::ACTIVEMQ_TOPIC )
- result = new ActiveMQTopic(physicalName) ;
-
- else if( type == ActiveMQDestination::ACTIVEMQ_TEMPORARY_TOPIC )
- result = new ActiveMQTempTopic(physicalName) ;
-
- else if (type == ActiveMQDestination::ACTIVEMQ_QUEUE)
- result = new ActiveMQQueue(physicalName) ;
-
- else
- result = new ActiveMQTempQueue(physicalName) ;
-
- return result ;
-}
-
-/*
- *
- */
-p<string> ActiveMQDestination::createTemporaryName(const char* clientId)
-{
- p<string> tempName = new string() ;
-
- tempName->assign( ActiveMQDestination::TEMP_PREFIX ) ;
- tempName->append(clientId) ;
- tempName->append( ActiveMQDestination::TEMP_POSTFIX ) ;
-
- return tempName ;
-}
-
-/*
- *
- */
-p<string> ActiveMQDestination::getClientId(p<ActiveMQDestination> destination)
-{
- p<string> answer = NULL ;
-
- if( destination != NULL && destination->isTemporary() )
- {
- p<string> name = destination->getPhysicalName() ;
- int start = (int)name->find(TEMP_PREFIX),
- stop ;
-
- if( start >= 0 )
- {
- start += (int)strlen(TEMP_PREFIX) ;
- stop = (int)name->find_last_of(TEMP_POSTFIX) ;
-
- if( stop > start && stop < (int)name->length() )
- answer->assign( name->substr(start, stop) ) ;
- }
- }
- return answer;
-}
-
-/*
- *
- */
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/DestinationFilter.hpp"
+#include "activemq/command/ActiveMQDestination.hpp"
+#include "activemq/command/ActiveMQTempQueue.hpp"
+#include "activemq/command/ActiveMQTempTopic.hpp"
+#include "activemq/command/ActiveMQQueue.hpp"
+#include "activemq/command/ActiveMQTopic.hpp"
+
+using namespace apache::activemq::command;
+
+
+// --- Static initialization ----------------------------------------
+
+const char* ActiveMQDestination::ADVISORY_PREFIX = "ActiveMQ.Advisory." ;
+const char* ActiveMQDestination::CONSUMER_ADVISORY_PREFIX = "ActiveMQ.Advisory.Consumers." ;
+const char* ActiveMQDestination::PRODUCER_ADVISORY_PREFIX = "ActiveMQ.Advisory.Producers." ;
+const char* ActiveMQDestination::CONNECTION_ADVISORY_PREFIX = "ActiveMQ.Advisory.Connections." ;
+const char* ActiveMQDestination::DEFAULT_ORDERED_TARGET = "coordinator" ;
+
+const char* ActiveMQDestination::TEMP_PREFIX = "{TD{" ;
+const char* ActiveMQDestination::TEMP_POSTFIX = "}TD}" ;
+const char* ActiveMQDestination::COMPOSITE_SEPARATOR = "," ;
+const char* ActiveMQDestination::QUEUE_PREFIX = "queue://" ;
+const char* ActiveMQDestination::TOPIC_PREFIX = "topic://" ;
+
+
+/*
+ * Default constructor
+ */
+ActiveMQDestination::ActiveMQDestination()
+{
+ orderedTarget = new string(DEFAULT_ORDERED_TARGET) ;
+ physicalName = new string("") ;
+ exclusive = false ;
+ ordered = false ;
+ advisory = false ;
+}
+
+/*
+ * Constructs the destination with a specified physical name.
+ *
+ * @param name the physical name for the destination.
+ */
+ActiveMQDestination::ActiveMQDestination(const char* name)
+{
+ orderedTarget = new string(DEFAULT_ORDERED_TARGET) ;
+ physicalName = new string(name) ;
+ exclusive = false ;
+ ordered = false ;
+ advisory = ( name != NULL && (physicalName->find(ADVISORY_PREFIX) == 0)) ? true : false ;
+}
+
+/*
+ *
+ */
+ActiveMQDestination::~ActiveMQDestination()
+{
+ // no-op
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isAdvisory()
+{
+ return advisory ;
+}
+
+/*
+ *
+ */
+void ActiveMQDestination::setAdvisory(bool advisory)
+{
+ this->advisory = advisory ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isConsumerAdvisory()
+{
+ return ( isAdvisory() && (physicalName->find(CONSUMER_ADVISORY_PREFIX) == 0) ) ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isProducerAdvisory()
+{
+ return ( isAdvisory() && (physicalName->find(PRODUCER_ADVISORY_PREFIX) == 0) ) ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isConnectionAdvisory()
+{
+ return ( isAdvisory() && (physicalName->find(CONNECTION_ADVISORY_PREFIX) == 0) ) ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isExclusive()
+{
+ return exclusive ;
+}
+
+/*
+ *
+ */
+void ActiveMQDestination::setExclusive(bool exclusive)
+{
+ this->exclusive = exclusive ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isOrdered()
+{
+ return ordered ;
+}
+
+/*
+ *
+ */
+void ActiveMQDestination::setOrdered(bool ordered)
+{
+ this->ordered = ordered ;
+}
+
+/*
+ *
+ */
+p<string> ActiveMQDestination::getOrderedTarget()
+{
+ return orderedTarget ;
+}
+
+/*
+ *
+ */
+void ActiveMQDestination::setOrderedTarget(const char* target)
+{
+ this->orderedTarget->assign(target) ;
+}
+
+/*
+ *
+ */
+p<string> ActiveMQDestination::getPhysicalName()
+{
+ return physicalName ;
+}
+
+/*
+ *
+ */
+void ActiveMQDestination::setPhysicalName(const char* name)
+{
+ physicalName->assign(name) ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isTopic()
+{
+ return ( getDestinationType() == ACTIVEMQ_TOPIC ||
+ getDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC ) ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isQueue()
+{
+ return !isTopic() ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isTemporary()
+{
+ return ( getDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC ||
+ getDestinationType() == ACTIVEMQ_TEMPORARY_QUEUE ) ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isComposite()
+{
+ return ( physicalName->find(ActiveMQDestination::COMPOSITE_SEPARATOR) > 0 ) ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isWildcard()
+{
+ if( physicalName != NULL )
+ {
+ return ( physicalName->find( DestinationFilter::ANY_CHILD ) >= 0 ||
+ physicalName->find( DestinationFilter::ANY_DESCENDENT ) >= 0 ) ;
+ }
+ return false ;
+}
+
+/*
+ *
+ */
+p<string> ActiveMQDestination::toString()
+{
+ return physicalName ;
+}
+
+// --- Static methods ---------------------------------------------
+
+/*
+ * A helper method to return a descriptive string for the topic or queue.
+ */
+p<string> ActiveMQDestination::inspect(p<ActiveMQDestination> destination)
+{
+ p<string> description = new string() ;
+
+ if( typeid(*destination) == typeid(ITopic) )
+ description->assign("Topic(") ;
+ else
+ description->assign("Queue(") ;
+
+ description->append( destination->toString()->c_str() ) ;
+ description->append(")") ;
+
+ return description ;
+}
+
+/*
+ *
+ */
+/*p<ActiveMQDestination> ActiveMQDestination::transform(p<IDestination> destination)
+{
+ p<ActiveMQDestination> result = NULL ;
+
+ if( destination != NULL )
+ {
+ if( typeid(*destination) == typeid(ActiveMQDestination) )
+ result = p_cast<ActiveMQDestination> (destination) ;
+
+ else
+ {
+ if( typeid(ITopic).before(typeid(IDestination)) )
+ result = new ActiveMQTopic( (p_cast<ITopic> (destination))->getTopicName()->c_str() ) ;
+
+ else if( typeid(*destination).before(typeid(IQueue)) )
+ result = new ActiveMQQueue( (p_cast<IQueue> (destination))->getQueueName()->c_str() ) ;
+
+ else if( typeid(ITemporaryQueue).before(typeid(*destination)) )
+ result = new ActiveMQTempQueue( (p_cast<IQueue> (destination))->getQueueName()->c_str() ) ;
+
+ else if( typeid(ITemporaryTopic).before(typeid(*destination)) )
+ result = new ActiveMQTempTopic( (p_cast<ITopic> (destination))->getTopicName()->c_str() ) ;
+
+ }
+ }
+ return result ;
+}*/
+
+/*
+ *
+ */
+p<ActiveMQDestination> ActiveMQDestination::createDestination(int type, const char* physicalName)
+{
+ p<ActiveMQDestination> result = NULL ;
+
+ if( type == ActiveMQDestination::ACTIVEMQ_TOPIC )
+ result = new ActiveMQTopic(physicalName) ;
+
+ else if( type == ActiveMQDestination::ACTIVEMQ_TEMPORARY_TOPIC )
+ result = new ActiveMQTempTopic(physicalName) ;
+
+ else if (type == ActiveMQDestination::ACTIVEMQ_QUEUE)
+ result = new ActiveMQQueue(physicalName) ;
+
+ else
+ result = new ActiveMQTempQueue(physicalName) ;
+
+ return result ;
+}
+
+/*
+ *
+ */
+p<string> ActiveMQDestination::createTemporaryName(const char* clientId)
+{
+ p<string> tempName = new string() ;
+
+ tempName->assign( ActiveMQDestination::TEMP_PREFIX ) ;
+ tempName->append(clientId) ;
+ tempName->append( ActiveMQDestination::TEMP_POSTFIX ) ;
+
+ return tempName ;
+}
+
+/*
+ *
+ */
+p<string> ActiveMQDestination::getClientId(p<ActiveMQDestination> destination)
+{
+ p<string> answer = NULL ;
+
+ if( destination != NULL && destination->isTemporary() )
+ {
+ p<string> name = destination->getPhysicalName() ;
+ int start = (int)name->find(TEMP_PREFIX),
+ stop ;
+
+ if( start >= 0 )
+ {
+ start += (int)strlen(TEMP_PREFIX) ;
+ stop = (int)name->find_last_of(TEMP_POSTFIX) ;
+
+ if( stop > start && stop < (int)name->length() )
+ answer->assign( name->substr(start, stop) ) ;
+ }
+ }
+ return answer;
+}
+
+/*
+ *
+ */
int ActiveMQDestination::marshal(p<IMarshaller> marshaller, int mode, p<IOutputStream> ostream) throw (IOException)
{
int size = 0 ;
@@ -355,9 +355,9 @@
return size ;
}
-/*
- *
- */
+/*
+ *
+ */
void ActiveMQDestination::unmarshal(p<IMarshaller> marshaller, int mode, p<IInputStream> istream) throw (IOException)
{
physicalName = p_cast<string>(marshaller->unmarshalString(mode, istream)) ;
Propchange: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQDestination.cpp
------------------------------------------------------------------------------
svn:eol-style = native