You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/06 05:18:03 UTC

svn commit: r419422 [2/11] - in /incubator/activemq/trunk/openwire-cpp/src/main/cpp: activemq/ activemq/command/ activemq/protocol/openwire/ activemq/transport/ activemq/transport/tcp/ cms/ ppr/ ppr/io/ ppr/io/encoding/ ppr/net/ ppr/thread/ ppr/util/

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp?rev=419422&r1=419421&r2=419422&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp Wed Jul  5 20:17:58 2006
@@ -1,514 +1,514 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "activemq/Session.hpp"
-#include "activemq/command/ActiveMQDestination.hpp"
-#include "activemq/command/ActiveMQQueue.hpp"
-#include "activemq/command/ActiveMQTopic.hpp"
-#include "activemq/command/ActiveMQTempQueue.hpp"
-#include "activemq/command/ActiveMQTempTopic.hpp"
-#include "activemq/command/ActiveMQMessage.hpp"
-#include "activemq/command/ActiveMQBytesMessage.hpp"
-#include "activemq/command/ActiveMQMapMessage.hpp"
-#include "activemq/command/ActiveMQTextMessage.hpp"
-#include "activemq/command/ProducerInfo.hpp"
-#include "activemq/command/ConsumerInfo.hpp"
-#include "activemq/command/MessageAck.hpp"
-#include "activemq/MessageConsumer.hpp"
-#include "activemq/MessageProducer.hpp"
-#include "activemq/Connection.hpp"
-
-using namespace apache::activemq;
-
-
-// Constructors -----------------------------------------------------
-
-/*
- * 
- */
-Session::Session(p<Connection> connection, p<SessionInfo> info, AcknowledgementMode ackMode)
-{
-    this->connection         = connection ;
-    this->sessionInfo        = info ;
-    this->ackMode            = ackMode ;
-    this->prefetchSize       = 1000 ;
-    this->consumerCounter    = 0 ;
-    this->producerCounter    = 0 ;
-    this->transactionContext = new TransactionContext(smartify(this)) ;
-    this->dispatchThread     = new DispatchThread(smartify(this)) ;
-    this->closed             = false ;
-
-    // Activate backround dispatch thread
-    dispatchThread->start() ;
-}
-
-/*
- * 
- */
-Session::~Session()
-{
-    // Make sure session is closed
-    close() ;
-}
-
-
-// Attribute methods ------------------------------------------------
-
-/*
- * 
- */
-bool Session::isTransacted()
-{
-    return ( ackMode == TransactionalAckMode ) ? true : false ; 
-}
-
-/*
- * 
- */
-p<Connection> Session::getConnection()
-{
-    return connection ;
-}
-
-/*
- * 
- */
-p<SessionId> Session::getSessionId()
-{
-    return sessionInfo->getSessionId() ;
-}
-
-/*
- * 
- */
-p<TransactionContext> Session::getTransactionContext()
-{
-    return transactionContext ;
-}
-
-/*
- * 
- */
-p<MessageConsumer> Session::getConsumer(p<ConsumerId> consumerId)
-{
-    map<long long, p<MessageConsumer> >::iterator tempIter ;
-
-    // Check if key exists in map
-    tempIter = consumers.find( consumerId->getValue() ) ;
-    if( tempIter == consumers.end() )
-        return NULL ;
-    else
-        return tempIter->second ;
-}
-
-/*
- * 
- */
-p<MessageProducer> Session::getProducer(p<ProducerId> producerId)
-{
-    map<long long, p<MessageProducer> >::iterator tempIter ;
-
-    // Check if key exists in map
-    tempIter = producers.find( producerId->getValue() ) ;
-    if( tempIter == producers.end() )
-        return NULL ;
-    else
-        return tempIter->second ;
-}
-
-
-// Operation methods ------------------------------------------------
-
-/*
- * 
- */
-p<IMessageProducer> Session::createProducer()
-{
-    return createProducer(NULL) ; 
-}
-
-/*
- * 
- */
-p<IMessageProducer> Session::createProducer(p<IDestination> destination)
-{
-    p<ProducerInfo> command  = createProducerInfo(destination) ;
-    p<ProducerId> producerId = command->getProducerId() ;
-
-    try
-    {
-        p<MessageProducer> producer = new MessageProducer(smartify(this), command) ;
-
-        // Save the producer
-        producers[ producerId->getValue() ] = producer ;
-
-        // Register producer with broker
-        connection->syncRequest(command) ;
-
-        return producer ;
-    }
-    catch( exception e )
-    {
-        // Make sure producer was removed
-        producers[ producerId->getValue() ] = NULL ;
-        throw e ;
-    }
-}
-
-/*
- * 
- */
-p<IMessageConsumer> Session::createConsumer(p<IDestination> destination)
-{
-    return createConsumer(destination, NULL) ; 
-}
-
-/*
- * 
- */
-p<IMessageConsumer> Session::createConsumer(p<IDestination> destination, const char* selector)
-{
-    p<ConsumerInfo> command  = createConsumerInfo(destination, selector) ;
-    p<ConsumerId> consumerId = command->getConsumerId() ;
-
-    try
-    {
-        p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
-
-        // Save the consumer first in case message dispatching starts immediately
-        consumers[ consumerId->getValue() ] = consumer ;
-
-        // Register consumer with broker
-        connection->syncRequest(command) ;
-
-        return consumer ;
-    }
-    catch( exception e )
-    {
-        // Make sure consumer was removed
-        consumers[ consumerId->getValue() ] = NULL ;
-        throw e ;
-    }
-}
-
-p<IMessageConsumer> Session::createDurableConsumer(p<ITopic> destination, const char* name, const char* selector, bool noLocal)
-{
-    p<ConsumerInfo> command  = createConsumerInfo(destination, selector) ;
-    p<ConsumerId> consumerId = command->getConsumerId() ;
-    p<string>     subscriptionName = new string(name) ;
-
-    command->setSubcriptionName( subscriptionName ) ;
-    command->setNoLocal( noLocal ) ;
-    
-    try
-    {
-        p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
-
-        // Save the consumer first in case message dispatching starts immediately
-        consumers[ consumerId->getValue() ] = consumer ;
-
-        // Register consumer with broker
-        connection->syncRequest(command) ;
-
-        return consumer ;
-    }
-    catch( exception e )
-    {
-        // Make sure consumer was removed
-        consumers[ consumerId->getValue() ] = NULL ;
-        throw e ;
-    }
-}
-
-/*
- * 
- */
-p<IQueue> Session::getQueue(const char* name)
-{
-    p<IQueue> queue = new ActiveMQQueue(name) ;
-    return queue ;
-}
-
-/*
- * 
- */
-p<ITopic> Session::getTopic(const char* name)
-{
-    p<ITopic> topic = new ActiveMQTopic(name) ;
-    return topic ;
-}
-
-/*
- * 
- */
-p<ITemporaryQueue> Session::createTemporaryQueue()
-{
-    p<ITemporaryQueue> queue = new ActiveMQTempQueue( connection->createTemporaryDestinationName()->c_str() ) ;
-    return queue ;
-}
-
-/*
- * 
- */
-p<ITemporaryTopic> Session::createTemporaryTopic()
-{
-    p<ITemporaryTopic> topic = new ActiveMQTempTopic( connection->createTemporaryDestinationName()->c_str() ) ;
-    return topic ;
-}
-
-/*
- * 
- */
-p<IMessage> Session::createMessage()
-{
-    p<IMessage> message = new ActiveMQMessage() ;
-    configure(message) ;
-    return message ;
-}
-
-/*
- * 
- */
-p<IBytesMessage> Session::createBytesMessage()
-{
-    p<IBytesMessage> message = new ActiveMQBytesMessage() ;
-    configure(message) ;
-    return message ;
-}
-
-/*
- * 
- */
-p<IBytesMessage> Session::createBytesMessage(char* body, int size)
-{
-    p<IBytesMessage> message = new ActiveMQBytesMessage( body, size ) ;
-    configure(message) ;
-    return message ;
-}
-
-/*
- * 
- */
-p<IMapMessage> Session::createMapMessage()
-{
-    p<IMapMessage> message = new ActiveMQMapMessage() ;
-    configure(message) ;
-    return message ;
-}
-
-/*
- * 
- */
-p<ITextMessage> Session::createTextMessage()
-{
-    p<ITextMessage> message = new ActiveMQTextMessage() ;
-    configure(message) ;
-    return message ;
-}
-
-/*
- * 
- */
-p<ITextMessage> Session::createTextMessage(const char* text)
-{
-    p<ITextMessage> message = new ActiveMQTextMessage(text) ;
-    configure(message) ;
-    return message ;
-}
-
-/*
- * 
- */
-void Session::commit() throw(CmsException)
-{
-    if( !isTransacted() )
-        throw CmsException("You cannot perform a commit on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
-
-    transactionContext->commit() ;
-}
-
-/*
- * 
- */
-void Session::rollback() throw(CmsException)
-{
-    if( !isTransacted() )
-        throw CmsException("You cannot perform a rollback on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
-
-    transactionContext->rollback() ;
-
-    map<long long, p<MessageConsumer> >::const_iterator tempIter ;
-
-    // Ensure all the consumers redeliver any rolled back messages
-    for( tempIter = consumers.begin() ;
-         tempIter != consumers.end() ;
-         tempIter++ )
-    {
-        ((*tempIter).second)->redeliverRolledBackMessages() ;
-    }
-}
-
-/*
- * 
- */
-void Session::doSend(p<IDestination> destination, p<IMessage> message)
-{
-    p<ActiveMQMessage> command = p_dyncast<ActiveMQMessage> (message) ;
-    // TODO complete packet
-    connection->syncRequest(command) ;
-}
-
-/*
- * Starts a new transaction
- */
-void Session::doStartTransaction()
-{
-    if( isTransacted() )
-        transactionContext->begin() ;
-}
-
-/*
- * 
- */
-void Session::dispatch(int delay)
-{
-    if( delay > 0 ) 
-        dispatchThread->sleep(delay) ;
-
-    dispatchThread->wakeup() ;
-}
-
-/*
- * 
- */
-void Session::dispatchAsyncMessages()
-{
-    // Ensure that only 1 thread dispatches messages in a consumer at once
-    LOCKED_SCOPE (mutex);
-
-    map<long long, p<MessageConsumer> >::const_iterator tempIter ;
-
-    // Iterate through each consumer created by this session
-    // ensuring that they have all pending messages dispatched
-    for( tempIter = consumers.begin() ;
-         tempIter != consumers.end() ;
-         tempIter++ )
-    {
-        ((*tempIter).second)->dispatchAsyncMessages() ;
-    }
-}
-
-/*
- *
- */
-void Session::close()
-{
-    if( !closed )
-    {
-        map<long long, p<MessageConsumer> >::iterator consumerIter ;
-        map<long long, p<MessageProducer> >::iterator producerIter ;
-
-        // Shutdown dispatch thread
-        dispatchThread->interrupt() ;
-        dispatchThread->join() ;
-        dispatchThread = NULL ;
-
-        // Iterate through all consumers and close them down
-        for( consumerIter = consumers.begin() ;
-             consumerIter != consumers.end() ;
-             consumerIter++ )
-        {
-            consumerIter->second->close() ;
-            consumerIter->second = NULL ;
-        }
-
-        // Iterate through all producers and close them down
-        for( producerIter = producers.begin() ;
-             producerIter != producers.end() ;
-             producerIter++ )
-        {
-            producerIter->second->close() ;
-            producerIter->second = NULL ;
-        }
-        // De-register session from broker/connection
-        connection->disposeOf( sessionInfo->getSessionId() ) ;
-
-        // Clean up
-        connection = NULL ;
-        closed     = true ;
-
-    }
-}
-
-
-// Implementation methods ------------------------------------------
-
-/*
- * 
- */
-p<ConsumerInfo> Session::createConsumerInfo(p<IDestination> destination, const char* selector)
-{
-    p<ConsumerInfo> consumerInfo = new ConsumerInfo() ;
-    p<ConsumerId> consumerId = new ConsumerId() ;
-
-    consumerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
-    consumerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
-
-    {
-        LOCKED_SCOPE (mutex);
-        consumerId->setValue( ++consumerCounter ) ;
-    }
-    p<string> sel = ( selector == NULL ) ? NULL : new string(selector) ;
-
-    // TODO complete packet
-    consumerInfo->setConsumerId( consumerId ) ;
-    consumerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
-    consumerInfo->setSelector( sel ) ;
-    consumerInfo->setPrefetchSize( this->prefetchSize ) ;
-
-    return consumerInfo ;
-}
-
-/*
- * 
- */
-p<ProducerInfo> Session::createProducerInfo(p<IDestination> destination)
-{
-    p<ProducerInfo> producerInfo = new ProducerInfo() ;
-    p<ProducerId> producerId = new ProducerId() ;
-
-    producerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
-    producerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
-
-    {
-        LOCKED_SCOPE (mutex);
-        producerId->setValue( ++producerCounter ) ;
-    }
-
-    // TODO complete packet
-    producerInfo->setProducerId( producerId ) ;
-    producerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
-
-    return producerInfo ;
-} 
-
-/*
- * Configures the message command.
- */
-void Session::configure(p<IMessage> message)
-{
-    // TODO:
-}
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/Session.hpp"
+#include "activemq/command/ActiveMQDestination.hpp"
+#include "activemq/command/ActiveMQQueue.hpp"
+#include "activemq/command/ActiveMQTopic.hpp"
+#include "activemq/command/ActiveMQTempQueue.hpp"
+#include "activemq/command/ActiveMQTempTopic.hpp"
+#include "activemq/command/ActiveMQMessage.hpp"
+#include "activemq/command/ActiveMQBytesMessage.hpp"
+#include "activemq/command/ActiveMQMapMessage.hpp"
+#include "activemq/command/ActiveMQTextMessage.hpp"
+#include "activemq/command/ProducerInfo.hpp"
+#include "activemq/command/ConsumerInfo.hpp"
+#include "activemq/command/MessageAck.hpp"
+#include "activemq/MessageConsumer.hpp"
+#include "activemq/MessageProducer.hpp"
+#include "activemq/Connection.hpp"
+
+using namespace apache::activemq;
+
+
+// Constructors -----------------------------------------------------
+
+/*
+ * 
+ */
+Session::Session(p<Connection> connection, p<SessionInfo> info, AcknowledgementMode ackMode)
+{
+    this->connection         = connection ;
+    this->sessionInfo        = info ;
+    this->ackMode            = ackMode ;
+    this->prefetchSize       = 1000 ;
+    this->consumerCounter    = 0 ;
+    this->producerCounter    = 0 ;
+    this->transactionContext = new TransactionContext(smartify(this)) ;
+    this->dispatchThread     = new DispatchThread(smartify(this)) ;
+    this->closed             = false ;
+
+    // Activate backround dispatch thread
+    dispatchThread->start() ;
+}
+
+/*
+ * 
+ */
+Session::~Session()
+{
+    // Make sure session is closed
+    close() ;
+}
+
+
+// Attribute methods ------------------------------------------------
+
+/*
+ * 
+ */
+bool Session::isTransacted()
+{
+    return ( ackMode == TransactionalAckMode ) ? true : false ; 
+}
+
+/*
+ * 
+ */
+p<Connection> Session::getConnection()
+{
+    return connection ;
+}
+
+/*
+ * 
+ */
+p<SessionId> Session::getSessionId()
+{
+    return sessionInfo->getSessionId() ;
+}
+
+/*
+ * 
+ */
+p<TransactionContext> Session::getTransactionContext()
+{
+    return transactionContext ;
+}
+
+/*
+ * 
+ */
+p<MessageConsumer> Session::getConsumer(p<ConsumerId> consumerId)
+{
+    map<long long, p<MessageConsumer> >::iterator tempIter ;
+
+    // Check if key exists in map
+    tempIter = consumers.find( consumerId->getValue() ) ;
+    if( tempIter == consumers.end() )
+        return NULL ;
+    else
+        return tempIter->second ;
+}
+
+/*
+ * 
+ */
+p<MessageProducer> Session::getProducer(p<ProducerId> producerId)
+{
+    map<long long, p<MessageProducer> >::iterator tempIter ;
+
+    // Check if key exists in map
+    tempIter = producers.find( producerId->getValue() ) ;
+    if( tempIter == producers.end() )
+        return NULL ;
+    else
+        return tempIter->second ;
+}
+
+
+// Operation methods ------------------------------------------------
+
+/*
+ * 
+ */
+p<IMessageProducer> Session::createProducer()
+{
+    return createProducer(NULL) ; 
+}
+
+/*
+ * 
+ */
+p<IMessageProducer> Session::createProducer(p<IDestination> destination)
+{
+    p<ProducerInfo> command  = createProducerInfo(destination) ;
+    p<ProducerId> producerId = command->getProducerId() ;
+
+    try
+    {
+        p<MessageProducer> producer = new MessageProducer(smartify(this), command) ;
+
+        // Save the producer
+        producers[ producerId->getValue() ] = producer ;
+
+        // Register producer with broker
+        connection->syncRequest(command) ;
+
+        return producer ;
+    }
+    catch( exception e )
+    {
+        // Make sure producer was removed
+        producers[ producerId->getValue() ] = NULL ;
+        throw e ;
+    }
+}
+
+/*
+ * 
+ */
+p<IMessageConsumer> Session::createConsumer(p<IDestination> destination)
+{
+    return createConsumer(destination, NULL) ; 
+}
+
+/*
+ * 
+ */
+p<IMessageConsumer> Session::createConsumer(p<IDestination> destination, const char* selector)
+{
+    p<ConsumerInfo> command  = createConsumerInfo(destination, selector) ;
+    p<ConsumerId> consumerId = command->getConsumerId() ;
+
+    try
+    {
+        p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
+
+        // Save the consumer first in case message dispatching starts immediately
+        consumers[ consumerId->getValue() ] = consumer ;
+
+        // Register consumer with broker
+        connection->syncRequest(command) ;
+
+        return consumer ;
+    }
+    catch( exception e )
+    {
+        // Make sure consumer was removed
+        consumers[ consumerId->getValue() ] = NULL ;
+        throw e ;
+    }
+}
+
+p<IMessageConsumer> Session::createDurableConsumer(p<ITopic> destination, const char* name, const char* selector, bool noLocal)
+{
+    p<ConsumerInfo> command  = createConsumerInfo(destination, selector) ;
+    p<ConsumerId> consumerId = command->getConsumerId() ;
+    p<string>     subscriptionName = new string(name) ;
+
+    command->setSubcriptionName( subscriptionName ) ;
+    command->setNoLocal( noLocal ) ;
+    
+    try
+    {
+        p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
+
+        // Save the consumer first in case message dispatching starts immediately
+        consumers[ consumerId->getValue() ] = consumer ;
+
+        // Register consumer with broker
+        connection->syncRequest(command) ;
+
+        return consumer ;
+    }
+    catch( exception e )
+    {
+        // Make sure consumer was removed
+        consumers[ consumerId->getValue() ] = NULL ;
+        throw e ;
+    }
+}
+
+/*
+ * 
+ */
+p<IQueue> Session::getQueue(const char* name)
+{
+    p<IQueue> queue = new ActiveMQQueue(name) ;
+    return queue ;
+}
+
+/*
+ * 
+ */
+p<ITopic> Session::getTopic(const char* name)
+{
+    p<ITopic> topic = new ActiveMQTopic(name) ;
+    return topic ;
+}
+
+/*
+ * 
+ */
+p<ITemporaryQueue> Session::createTemporaryQueue()
+{
+    p<ITemporaryQueue> queue = new ActiveMQTempQueue( connection->createTemporaryDestinationName()->c_str() ) ;
+    return queue ;
+}
+
+/*
+ * 
+ */
+p<ITemporaryTopic> Session::createTemporaryTopic()
+{
+    p<ITemporaryTopic> topic = new ActiveMQTempTopic( connection->createTemporaryDestinationName()->c_str() ) ;
+    return topic ;
+}
+
+/*
+ * 
+ */
+p<IMessage> Session::createMessage()
+{
+    p<IMessage> message = new ActiveMQMessage() ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<IBytesMessage> Session::createBytesMessage()
+{
+    p<IBytesMessage> message = new ActiveMQBytesMessage() ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<IBytesMessage> Session::createBytesMessage(char* body, int size)
+{
+    p<IBytesMessage> message = new ActiveMQBytesMessage( body, size ) ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<IMapMessage> Session::createMapMessage()
+{
+    p<IMapMessage> message = new ActiveMQMapMessage() ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<ITextMessage> Session::createTextMessage()
+{
+    p<ITextMessage> message = new ActiveMQTextMessage() ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<ITextMessage> Session::createTextMessage(const char* text)
+{
+    p<ITextMessage> message = new ActiveMQTextMessage(text) ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+void Session::commit() throw(CmsException)
+{
+    if( !isTransacted() )
+        throw CmsException("You cannot perform a commit on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
+
+    transactionContext->commit() ;
+}
+
+/*
+ * 
+ */
+void Session::rollback() throw(CmsException)
+{
+    if( !isTransacted() )
+        throw CmsException("You cannot perform a rollback on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
+
+    transactionContext->rollback() ;
+
+    map<long long, p<MessageConsumer> >::const_iterator tempIter ;
+
+    // Ensure all the consumers redeliver any rolled back messages
+    for( tempIter = consumers.begin() ;
+         tempIter != consumers.end() ;
+         tempIter++ )
+    {
+        ((*tempIter).second)->redeliverRolledBackMessages() ;
+    }
+}
+
+/*
+ * 
+ */
+void Session::doSend(p<IDestination> destination, p<IMessage> message)
+{
+    p<ActiveMQMessage> command = p_dyncast<ActiveMQMessage> (message) ;
+    // TODO complete packet
+    connection->syncRequest(command) ;
+}
+
+/*
+ * Starts a new transaction
+ */
+void Session::doStartTransaction()
+{
+    if( isTransacted() )
+        transactionContext->begin() ;
+}
+
+/*
+ * 
+ */
+void Session::dispatch(int delay)
+{
+    if( delay > 0 ) 
+        dispatchThread->sleep(delay) ;
+
+    dispatchThread->wakeup() ;
+}
+
+/*
+ * 
+ */
+void Session::dispatchAsyncMessages()
+{
+    // Ensure that only 1 thread dispatches messages in a consumer at once
+    LOCKED_SCOPE (mutex);
+
+    map<long long, p<MessageConsumer> >::const_iterator tempIter ;
+
+    // Iterate through each consumer created by this session
+    // ensuring that they have all pending messages dispatched
+    for( tempIter = consumers.begin() ;
+         tempIter != consumers.end() ;
+         tempIter++ )
+    {
+        ((*tempIter).second)->dispatchAsyncMessages() ;
+    }
+}
+
+/*
+ *
+ */
+void Session::close()
+{
+    if( !closed )
+    {
+        map<long long, p<MessageConsumer> >::iterator consumerIter ;
+        map<long long, p<MessageProducer> >::iterator producerIter ;
+
+        // Shutdown dispatch thread
+        dispatchThread->interrupt() ;
+        dispatchThread->join() ;
+        dispatchThread = NULL ;
+
+        // Iterate through all consumers and close them down
+        for( consumerIter = consumers.begin() ;
+             consumerIter != consumers.end() ;
+             consumerIter++ )
+        {
+            consumerIter->second->close() ;
+            consumerIter->second = NULL ;
+        }
+
+        // Iterate through all producers and close them down
+        for( producerIter = producers.begin() ;
+             producerIter != producers.end() ;
+             producerIter++ )
+        {
+            producerIter->second->close() ;
+            producerIter->second = NULL ;
+        }
+        // De-register session from broker/connection
+        connection->disposeOf( sessionInfo->getSessionId() ) ;
+
+        // Clean up
+        connection = NULL ;
+        closed     = true ;
+
+    }
+}
+
+
+// Implementation methods ------------------------------------------
+
+/*
+ * 
+ */
+p<ConsumerInfo> Session::createConsumerInfo(p<IDestination> destination, const char* selector)
+{
+    p<ConsumerInfo> consumerInfo = new ConsumerInfo() ;
+    p<ConsumerId> consumerId = new ConsumerId() ;
+
+    consumerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
+    consumerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
+
+    {
+        LOCKED_SCOPE (mutex);
+        consumerId->setValue( ++consumerCounter ) ;
+    }
+    p<string> sel = ( selector == NULL ) ? NULL : new string(selector) ;
+
+    // TODO complete packet
+    consumerInfo->setConsumerId( consumerId ) ;
+    consumerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
+    consumerInfo->setSelector( sel ) ;
+    consumerInfo->setPrefetchSize( this->prefetchSize ) ;
+
+    return consumerInfo ;
+}
+
+/*
+ * 
+ */
+p<ProducerInfo> Session::createProducerInfo(p<IDestination> destination)
+{
+    p<ProducerInfo> producerInfo = new ProducerInfo() ;
+    p<ProducerId> producerId = new ProducerId() ;
+
+    producerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
+    producerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
+
+    {
+        LOCKED_SCOPE (mutex);
+        producerId->setValue( ++producerCounter ) ;
+    }
+
+    // TODO complete packet
+    producerInfo->setProducerId( producerId ) ;
+    producerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
+
+    return producerInfo ;
+} 
+
+/*
+ * Configures the message command.
+ */
+void Session::configure(p<IMessage> message)
+{
+    // TODO:
+}

Propchange: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/TransactionContext.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/TransactionContext.cpp?rev=419422&r1=419421&r2=419422&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/TransactionContext.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/TransactionContext.cpp Wed Jul  5 20:17:58 2006
@@ -1,149 +1,149 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "activemq/TransactionContext.hpp"
-#include "activemq/Session.hpp"
-
-using namespace apache::activemq;
-
-/*
- * 
- */
-TransactionContext::TransactionContext(p<Session> session)
-{
-    this->session       = session ;
-    this->transactionId = NULL ;
-}
-
-/*
- * 
- */
-TransactionContext::~TransactionContext()
-{
-    // no-op
-}
-
-/*
- * 
- */
-p<TransactionId> TransactionContext::getTransactionId()
-{
-    return transactionId ;
-}
-
-/*
- * 
- */
-void TransactionContext::addSynchronization(p<ISynchronization> synchronization)
-{
-    synchronizations.push_back(synchronization) ;
-}
-
-/*
- * 
- */
-void TransactionContext::begin()
-{
-    if( transactionId == NULL )
-    {
-        // Create a new local transaction id
-        transactionId = session->getConnection()->createLocalTransactionId() ;
-
-        // Create a new transaction command
-        p<TransactionInfo> info = new TransactionInfo() ;
-        info->setConnectionId( session->getConnection()->getConnectionId() ) ;
-        info->setTransactionId( transactionId ) ;
-        info->setType( BeginTx ) ;
-
-        // Send begin command to broker
-        session->getConnection()->oneway(info) ;
-    }
-}
-
-/*
- * 
- */
-void TransactionContext::commit()
-{
-    list< p<ISynchronization> >::const_iterator tempIter ;
-
-    // Iterate through each synchronization and call beforeCommit()
-    for( tempIter = synchronizations.begin() ;
-         tempIter != synchronizations.end() ;
-         tempIter++ )
-    {
-        (*tempIter)->beforeCommit() ;
-    }
-
-    if( transactionId != NULL )
-    {
-        // Create a new transaction command
-        p<TransactionInfo> info = new TransactionInfo() ;
-        info->setConnectionId( session->getConnection()->getConnectionId() ) ;
-        info->setTransactionId( transactionId ) ;
-        info->setType( CommitOnePhaseTx ) ;
-
-        // Reset transaction
-        transactionId = NULL ;
-
-        // Send commit command to broker
-        session->getConnection()->oneway(info) ;
-    }
-
-    // Iterate through each synchronization and call afterCommit()
-    for( tempIter = synchronizations.begin() ;
-         tempIter != synchronizations.end() ;
-         tempIter++ )
-    {
-        (*tempIter)->afterCommit() ;
-    }
-
-    // Clear all syncronizations
-    synchronizations.clear() ;
-}
-
-/*
- * 
- */
-void TransactionContext::rollback()
-{
-    if( transactionId != NULL )
-    {
-        // Create a new transaction command
-        p<TransactionInfo> info = new TransactionInfo() ;
-        info->setConnectionId( session->getConnection()->getConnectionId() ) ;
-        info->setTransactionId( transactionId ) ;
-        info->setType( RollbackTx ) ;
-
-        // Reset transaction
-        transactionId = NULL ;
-
-        // Send rollback command to broker
-        session->getConnection()->oneway(info) ;
-    }
-
-    list< p<ISynchronization> >::const_iterator tempIter ;
-
-    // Iterate through each synchronization and call afterRollback()
-    for( tempIter = synchronizations.begin() ;
-         tempIter != synchronizations.end() ;
-         tempIter++ )
-    {
-        (*tempIter)->afterRollback() ;
-    }
-    // Clear all syncronizations
-    synchronizations.clear() ;
-}
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/TransactionContext.hpp"
+#include "activemq/Session.hpp"
+
+using namespace apache::activemq;
+
+/*
+ * 
+ */
+TransactionContext::TransactionContext(p<Session> session)
+{
+    this->session       = session ;
+    this->transactionId = NULL ;
+}
+
+/*
+ * 
+ */
+TransactionContext::~TransactionContext()
+{
+    // no-op
+}
+
+/*
+ * 
+ */
+p<TransactionId> TransactionContext::getTransactionId()
+{
+    return transactionId ;
+}
+
+/*
+ * 
+ */
+void TransactionContext::addSynchronization(p<ISynchronization> synchronization)
+{
+    synchronizations.push_back(synchronization) ;
+}
+
+/*
+ * 
+ */
+void TransactionContext::begin()
+{
+    if( transactionId == NULL )
+    {
+        // Create a new local transaction id
+        transactionId = session->getConnection()->createLocalTransactionId() ;
+
+        // Create a new transaction command
+        p<TransactionInfo> info = new TransactionInfo() ;
+        info->setConnectionId( session->getConnection()->getConnectionId() ) ;
+        info->setTransactionId( transactionId ) ;
+        info->setType( BeginTx ) ;
+
+        // Send begin command to broker
+        session->getConnection()->oneway(info) ;
+    }
+}
+
+/*
+ * 
+ */
+void TransactionContext::commit()
+{
+    list< p<ISynchronization> >::const_iterator tempIter ;
+
+    // Iterate through each synchronization and call beforeCommit()
+    for( tempIter = synchronizations.begin() ;
+         tempIter != synchronizations.end() ;
+         tempIter++ )
+    {
+        (*tempIter)->beforeCommit() ;
+    }
+
+    if( transactionId != NULL )
+    {
+        // Create a new transaction command
+        p<TransactionInfo> info = new TransactionInfo() ;
+        info->setConnectionId( session->getConnection()->getConnectionId() ) ;
+        info->setTransactionId( transactionId ) ;
+        info->setType( CommitOnePhaseTx ) ;
+
+        // Reset transaction
+        transactionId = NULL ;
+
+        // Send commit command to broker
+        session->getConnection()->oneway(info) ;
+    }
+
+    // Iterate through each synchronization and call afterCommit()
+    for( tempIter = synchronizations.begin() ;
+         tempIter != synchronizations.end() ;
+         tempIter++ )
+    {
+        (*tempIter)->afterCommit() ;
+    }
+
+    // Clear all syncronizations
+    synchronizations.clear() ;
+}
+
+/*
+ * 
+ */
+void TransactionContext::rollback()
+{
+    if( transactionId != NULL )
+    {
+        // Create a new transaction command
+        p<TransactionInfo> info = new TransactionInfo() ;
+        info->setConnectionId( session->getConnection()->getConnectionId() ) ;
+        info->setTransactionId( transactionId ) ;
+        info->setType( RollbackTx ) ;
+
+        // Reset transaction
+        transactionId = NULL ;
+
+        // Send rollback command to broker
+        session->getConnection()->oneway(info) ;
+    }
+
+    list< p<ISynchronization> >::const_iterator tempIter ;
+
+    // Iterate through each synchronization and call afterRollback()
+    for( tempIter = synchronizations.begin() ;
+         tempIter != synchronizations.end() ;
+         tempIter++ )
+    {
+        (*tempIter)->afterRollback() ;
+    }
+    // Clear all syncronizations
+    synchronizations.clear() ;
+}

Propchange: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/TransactionContext.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQBytesMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQBytesMessage.cpp?rev=419422&r1=419421&r2=419422&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQBytesMessage.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQBytesMessage.cpp Wed Jul  5 20:17:58 2006
@@ -1,391 +1,391 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "activemq/command/ActiveMQBytesMessage.hpp"
-
-using namespace apache::activemq::command;
-
-
-/*
- * 
- */
-ActiveMQBytesMessage::ActiveMQBytesMessage()
-{
-    this->bis      = NULL ;
-    this->bos      = new ByteArrayOutputStream() ;
-    this->dis      = NULL ;
-    this->dos      = new DataOutputStream( bos ) ;
-    this->readMode = false ;
-}
-
-/*
- * 
- */
-ActiveMQBytesMessage::ActiveMQBytesMessage(char* body, int size)
-{
-    // Convert body to SP array
-    array<char> buffer = array<char> (size) ;
-    memcpy(buffer.c_array(), body, size);
-
-    this->bis      = NULL ;
-    this->bos      = new ByteArrayOutputStream(buffer) ;
-    this->dis      = NULL ;
-    this->dos      = new DataOutputStream( bos ) ;
-    this->readMode = false ;
-}
-
-/*
- * 
- */
-ActiveMQBytesMessage::~ActiveMQBytesMessage()
-{
-}
-
-/*
- * 
- */
-unsigned char ActiveMQBytesMessage::getDataStructureType()
-{
-    return ActiveMQBytesMessage::TYPE ;
-}
-
-/*
- *
- */
-void ActiveMQBytesMessage::reset()
-{
-    if( readMode )
-    {
-        this->bos      = new ByteArrayOutputStream( bis->toArray() ) ;
-        this->bis      = NULL ;
-        this->dos      = new DataOutputStream( bos ) ;
-        this->dis      = NULL ;
-        this->readMode = false ;
-    }
-    else
-    {
-        this->bis      = new ByteArrayInputStream( bos->toArray() ) ;
-        this->bos      = NULL ;
-        this->dis      = new DataInputStream( bis ) ;
-        this->dos      = NULL ;
-        this->readMode = true ;
-    }
-}
-
-/*
- *
- */
-char ActiveMQBytesMessage::readByte() throw(MessageNotReadableException, MessageEOFException)
-{
-    // Assert read mode
-    if( !readMode )
-        throw MessageNotReadableException() ;
-
-    try
-    {
-        // Read a single byte
-        return dis->readByte() ;
-    }
-    catch( EOFException eof )
-    {
-        throw MessageEOFException() ;
-    }
-}
-
-/*
- *
- */
-int ActiveMQBytesMessage::readBytes(char* buffer, int offset, int length) throw (MessageNotReadableException, MessageEOFException)
-{
-    // Assert read mode
-    if( !readMode )
-        throw MessageNotReadableException() ;
-
-    try
-    {
-        // Read some bytes
-        return dis->read(buffer, offset, length) ;
-    }
-    catch( EOFException eof )
-    {
-        throw MessageEOFException() ;
-    }
-}
-
-/*
- *
- */
-bool ActiveMQBytesMessage::readBoolean() throw(MessageNotReadableException, MessageEOFException)
-{
-    // Assert read mode
-    if( !readMode )
-        throw MessageNotReadableException() ;
-
-    try
-    {
-        // Read a boolean
-        return dis->readBoolean() ;
-    }
-    catch( EOFException eof )
-    {
-        throw MessageEOFException() ;
-    }
-}
-
-/*
- *
- */
-double ActiveMQBytesMessage::readDouble() throw(MessageNotReadableException, MessageEOFException)
-{
-    // Assert read mode
-    if( !readMode )
-        throw MessageNotReadableException() ;
-
-    try
-    {
-        // Read a double
-        return dis->readDouble() ;
-    }
-    catch( EOFException eof )
-    {
-        throw MessageEOFException() ;
-    }
-}
-
-/*
- *
- */
-float ActiveMQBytesMessage::readFloat() throw(MessageNotReadableException, MessageEOFException)
-{
-    // Assert read mode
-    if( !readMode )
-        throw MessageNotReadableException() ;
-
-    try
-    {
-        // Read a float
-        return dis->readFloat() ;
-    }
-    catch( EOFException eof )
-    {
-        throw MessageEOFException() ;
-    }
-}
-
-/*
- *
- */
-short ActiveMQBytesMessage::readShort() throw(MessageNotReadableException, MessageEOFException)
-{
-    // Assert read mode
-    if( !readMode )
-        throw MessageNotReadableException() ;
-
-    try
-    {
-        // Read a short
-        return dis->readShort() ;
-    }
-    catch( EOFException eof )
-    {
-        throw MessageEOFException() ;
-    }
-}
-
-/*
- *
- */
-int ActiveMQBytesMessage::readInt() throw(MessageNotReadableException, MessageEOFException)
-{
-    // Assert read mode
-    if( !readMode )
-        throw MessageNotReadableException() ;
-
-    try
-    {
-        // Read an integer
-        return dis->readInt() ;
-    }
-    catch( EOFException eof )
-    {
-        throw MessageEOFException() ;
-    }
-}
-
-/*
- *
- */
-long long ActiveMQBytesMessage::readLong() throw(MessageNotReadableException, MessageEOFException)
-{
-    // Assert read mode
-    if( !readMode )
-        throw MessageNotReadableException() ;
-
-    try
-    {
-        // Read a long long
-        return dis->readLong() ;
-    }
-    catch( EOFException eof )
-    {
-        throw MessageEOFException() ;
-    }
-}
-
-/*
- *
- */
-p<string> ActiveMQBytesMessage::readString() throw(MessageNotReadableException, MessageEOFException)
-{
-    // Assert read mode
-    if( !readMode )
-        throw MessageNotReadableException() ;
-
-    try
-    {
-        // Read a string
-        return dis->readString() ;
-    }
-    catch( EOFException eof )
-    {
-        throw MessageEOFException() ;
-    }
-}
-
-/*
- * 
- */
-void ActiveMQBytesMessage::writeByte(char value) throw (MessageNotWritableException)
-{
-    // Assert write mode
-    if( readMode )
-        throw MessageNotWritableException() ;
-
-    // Write a single byte
-    dos->writeByte(value) ;
-}
-
-/*
- * 
- */
-void ActiveMQBytesMessage::writeBytes(char* value, int offset, int length) throw (MessageNotWritableException)
-{
-    // Assert write mode
-    if( readMode )
-        throw MessageNotWritableException() ;
-
-    // Write some bytes
-    dos->write(value, offset, length) ;
-}
-
-/*
- * 
- */
-void ActiveMQBytesMessage::writeBoolean(bool value) throw (MessageNotWritableException)
-{
-    // Assert write mode
-    if( readMode )
-        throw MessageNotWritableException() ;
-
-    // Write a boolean
-    dos->writeBoolean(value) ;
-}
-
-/*
- * 
- */
-void ActiveMQBytesMessage::writeDouble(double value) throw (MessageNotWritableException)
-{
-    // Assert write mode
-    if( readMode )
-        throw MessageNotWritableException() ;
-
-    // Write a double
-    dos->writeDouble(value) ;
-}
-
-/*
- * 
- */
-void ActiveMQBytesMessage::writeFloat(float value) throw (MessageNotWritableException)
-{
-    // Assert write mode
-    if( readMode )
-        throw MessageNotWritableException() ;
-
-    // Write a float
-    dos->writeFloat(value) ;
-}
-
-/*
- * 
- */
-void ActiveMQBytesMessage::writeInt(int value) throw (MessageNotWritableException)
-{
-    // Assert write mode
-    if( readMode )
-        throw MessageNotWritableException() ;
-
-    // Write an integer
-    dos->writeInt(value) ;
-}
-
-/*
- * 
- */
-void ActiveMQBytesMessage::writeLong(long long value) throw (MessageNotWritableException)
-{
-    // Assert write mode
-    if( readMode )
-        throw MessageNotWritableException() ;
-
-    // Write a long long
-    dos->writeLong(value) ;
-}
-
-/*
- * 
- */
-void ActiveMQBytesMessage::writeShort(short value) throw (MessageNotWritableException)
-{
-    // Assert write mode
-    if( readMode )
-        throw MessageNotWritableException() ;
-
-    // Write a short
-    dos->writeShort(value) ;
-}
-
-/*
- * 
- */
-void ActiveMQBytesMessage::writeString(const char* value) throw (MessageNotWritableException)
-{
-    // Assert write mode
-    if( readMode )
-        throw MessageNotWritableException() ;
-
-    // Write a string
-    p<string> v = new string(value) ;
-    dos->writeString(v) ;
-}
-
-/*
- *
- */
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/command/ActiveMQBytesMessage.hpp"
+
+using namespace apache::activemq::command;
+
+
+/*
+ * 
+ */
+ActiveMQBytesMessage::ActiveMQBytesMessage()
+{
+    this->bis      = NULL ;
+    this->bos      = new ByteArrayOutputStream() ;
+    this->dis      = NULL ;
+    this->dos      = new DataOutputStream( bos ) ;
+    this->readMode = false ;
+}
+
+/*
+ * 
+ */
+ActiveMQBytesMessage::ActiveMQBytesMessage(char* body, int size)
+{
+    // Convert body to SP array
+    array<char> buffer = array<char> (size) ;
+    memcpy(buffer.c_array(), body, size);
+
+    this->bis      = NULL ;
+    this->bos      = new ByteArrayOutputStream(buffer) ;
+    this->dis      = NULL ;
+    this->dos      = new DataOutputStream( bos ) ;
+    this->readMode = false ;
+}
+
+/*
+ * 
+ */
+ActiveMQBytesMessage::~ActiveMQBytesMessage()
+{
+}
+
+/*
+ * 
+ */
+unsigned char ActiveMQBytesMessage::getDataStructureType()
+{
+    return ActiveMQBytesMessage::TYPE ;
+}
+
+/*
+ *
+ */
+void ActiveMQBytesMessage::reset()
+{
+    if( readMode )
+    {
+        this->bos      = new ByteArrayOutputStream( bis->toArray() ) ;
+        this->bis      = NULL ;
+        this->dos      = new DataOutputStream( bos ) ;
+        this->dis      = NULL ;
+        this->readMode = false ;
+    }
+    else
+    {
+        this->bis      = new ByteArrayInputStream( bos->toArray() ) ;
+        this->bos      = NULL ;
+        this->dis      = new DataInputStream( bis ) ;
+        this->dos      = NULL ;
+        this->readMode = true ;
+    }
+}
+
+/*
+ *
+ */
+char ActiveMQBytesMessage::readByte() throw(MessageNotReadableException, MessageEOFException)
+{
+    // Assert read mode
+    if( !readMode )
+        throw MessageNotReadableException() ;
+
+    try
+    {
+        // Read a single byte
+        return dis->readByte() ;
+    }
+    catch( EOFException eof )
+    {
+        throw MessageEOFException() ;
+    }
+}
+
+/*
+ *
+ */
+int ActiveMQBytesMessage::readBytes(char* buffer, int offset, int length) throw (MessageNotReadableException, MessageEOFException)
+{
+    // Assert read mode
+    if( !readMode )
+        throw MessageNotReadableException() ;
+
+    try
+    {
+        // Read some bytes
+        return dis->read(buffer, offset, length) ;
+    }
+    catch( EOFException eof )
+    {
+        throw MessageEOFException() ;
+    }
+}
+
+/*
+ *
+ */
+bool ActiveMQBytesMessage::readBoolean() throw(MessageNotReadableException, MessageEOFException)
+{
+    // Assert read mode
+    if( !readMode )
+        throw MessageNotReadableException() ;
+
+    try
+    {
+        // Read a boolean
+        return dis->readBoolean() ;
+    }
+    catch( EOFException eof )
+    {
+        throw MessageEOFException() ;
+    }
+}
+
+/*
+ *
+ */
+double ActiveMQBytesMessage::readDouble() throw(MessageNotReadableException, MessageEOFException)
+{
+    // Assert read mode
+    if( !readMode )
+        throw MessageNotReadableException() ;
+
+    try
+    {
+        // Read a double
+        return dis->readDouble() ;
+    }
+    catch( EOFException eof )
+    {
+        throw MessageEOFException() ;
+    }
+}
+
+/*
+ *
+ */
+float ActiveMQBytesMessage::readFloat() throw(MessageNotReadableException, MessageEOFException)
+{
+    // Assert read mode
+    if( !readMode )
+        throw MessageNotReadableException() ;
+
+    try
+    {
+        // Read a float
+        return dis->readFloat() ;
+    }
+    catch( EOFException eof )
+    {
+        throw MessageEOFException() ;
+    }
+}
+
+/*
+ *
+ */
+short ActiveMQBytesMessage::readShort() throw(MessageNotReadableException, MessageEOFException)
+{
+    // Assert read mode
+    if( !readMode )
+        throw MessageNotReadableException() ;
+
+    try
+    {
+        // Read a short
+        return dis->readShort() ;
+    }
+    catch( EOFException eof )
+    {
+        throw MessageEOFException() ;
+    }
+}
+
+/*
+ *
+ */
+int ActiveMQBytesMessage::readInt() throw(MessageNotReadableException, MessageEOFException)
+{
+    // Assert read mode
+    if( !readMode )
+        throw MessageNotReadableException() ;
+
+    try
+    {
+        // Read an integer
+        return dis->readInt() ;
+    }
+    catch( EOFException eof )
+    {
+        throw MessageEOFException() ;
+    }
+}
+
+/*
+ *
+ */
+long long ActiveMQBytesMessage::readLong() throw(MessageNotReadableException, MessageEOFException)
+{
+    // Assert read mode
+    if( !readMode )
+        throw MessageNotReadableException() ;
+
+    try
+    {
+        // Read a long long
+        return dis->readLong() ;
+    }
+    catch( EOFException eof )
+    {
+        throw MessageEOFException() ;
+    }
+}
+
+/*
+ *
+ */
+p<string> ActiveMQBytesMessage::readString() throw(MessageNotReadableException, MessageEOFException)
+{
+    // Assert read mode
+    if( !readMode )
+        throw MessageNotReadableException() ;
+
+    try
+    {
+        // Read a string
+        return dis->readString() ;
+    }
+    catch( EOFException eof )
+    {
+        throw MessageEOFException() ;
+    }
+}
+
+/*
+ * 
+ */
+void ActiveMQBytesMessage::writeByte(char value) throw (MessageNotWritableException)
+{
+    // Assert write mode
+    if( readMode )
+        throw MessageNotWritableException() ;
+
+    // Write a single byte
+    dos->writeByte(value) ;
+}
+
+/*
+ * 
+ */
+void ActiveMQBytesMessage::writeBytes(char* value, int offset, int length) throw (MessageNotWritableException)
+{
+    // Assert write mode
+    if( readMode )
+        throw MessageNotWritableException() ;
+
+    // Write some bytes
+    dos->write(value, offset, length) ;
+}
+
+/*
+ * 
+ */
+void ActiveMQBytesMessage::writeBoolean(bool value) throw (MessageNotWritableException)
+{
+    // Assert write mode
+    if( readMode )
+        throw MessageNotWritableException() ;
+
+    // Write a boolean
+    dos->writeBoolean(value) ;
+}
+
+/*
+ * 
+ */
+void ActiveMQBytesMessage::writeDouble(double value) throw (MessageNotWritableException)
+{
+    // Assert write mode
+    if( readMode )
+        throw MessageNotWritableException() ;
+
+    // Write a double
+    dos->writeDouble(value) ;
+}
+
+/*
+ * 
+ */
+void ActiveMQBytesMessage::writeFloat(float value) throw (MessageNotWritableException)
+{
+    // Assert write mode
+    if( readMode )
+        throw MessageNotWritableException() ;
+
+    // Write a float
+    dos->writeFloat(value) ;
+}
+
+/*
+ * 
+ */
+void ActiveMQBytesMessage::writeInt(int value) throw (MessageNotWritableException)
+{
+    // Assert write mode
+    if( readMode )
+        throw MessageNotWritableException() ;
+
+    // Write an integer
+    dos->writeInt(value) ;
+}
+
+/*
+ * 
+ */
+void ActiveMQBytesMessage::writeLong(long long value) throw (MessageNotWritableException)
+{
+    // Assert write mode
+    if( readMode )
+        throw MessageNotWritableException() ;
+
+    // Write a long long
+    dos->writeLong(value) ;
+}
+
+/*
+ * 
+ */
+void ActiveMQBytesMessage::writeShort(short value) throw (MessageNotWritableException)
+{
+    // Assert write mode
+    if( readMode )
+        throw MessageNotWritableException() ;
+
+    // Write a short
+    dos->writeShort(value) ;
+}
+
+/*
+ * 
+ */
+void ActiveMQBytesMessage::writeString(const char* value) throw (MessageNotWritableException)
+{
+    // Assert write mode
+    if( readMode )
+        throw MessageNotWritableException() ;
+
+    // Write a string
+    p<string> v = new string(value) ;
+    dos->writeString(v) ;
+}
+
+/*
+ *
+ */
 int ActiveMQBytesMessage::marshal(p<IMarshaller> marshaller, int mode, p<IOutputStream> ostream) throw (IOException)
 {
     int size = 0 ;
@@ -402,9 +402,9 @@
     return size ;
 }
 
-/*
- *
- */
+/*
+ *
+ */
 void ActiveMQBytesMessage::unmarshal(p<IMarshaller> marshaller, int mode, p<IInputStream> istream) throw (IOException)
 {
     // Note! Message content unmarshalling is done in super class

Propchange: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQBytesMessage.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQDestination.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQDestination.cpp?rev=419422&r1=419421&r2=419422&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQDestination.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQDestination.cpp Wed Jul  5 20:17:58 2006
@@ -1,352 +1,352 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "activemq/DestinationFilter.hpp"
-#include "activemq/command/ActiveMQDestination.hpp"
-#include "activemq/command/ActiveMQTempQueue.hpp"
-#include "activemq/command/ActiveMQTempTopic.hpp"
-#include "activemq/command/ActiveMQQueue.hpp"
-#include "activemq/command/ActiveMQTopic.hpp"
-
-using namespace apache::activemq::command;
-
-
-// --- Static initialization ----------------------------------------
-
-const char* ActiveMQDestination::ADVISORY_PREFIX            = "ActiveMQ.Advisory." ;
-const char* ActiveMQDestination::CONSUMER_ADVISORY_PREFIX   = "ActiveMQ.Advisory.Consumers." ;
-const char* ActiveMQDestination::PRODUCER_ADVISORY_PREFIX   = "ActiveMQ.Advisory.Producers." ;
-const char* ActiveMQDestination::CONNECTION_ADVISORY_PREFIX = "ActiveMQ.Advisory.Connections." ;
-const char* ActiveMQDestination::DEFAULT_ORDERED_TARGET     = "coordinator" ;
-
-const char* ActiveMQDestination::TEMP_PREFIX         = "{TD{" ;
-const char* ActiveMQDestination::TEMP_POSTFIX        = "}TD}" ;
-const char* ActiveMQDestination::COMPOSITE_SEPARATOR = "," ;
-const char* ActiveMQDestination::QUEUE_PREFIX        = "queue://" ;
-const char* ActiveMQDestination::TOPIC_PREFIX        = "topic://" ;
-
-
-/*
- * Default constructor
- */
-ActiveMQDestination::ActiveMQDestination()
-{
-    orderedTarget = new string(DEFAULT_ORDERED_TARGET) ;
-    physicalName  = new string("") ;
-    exclusive     = false ;
-    ordered       = false ;
-    advisory      = false ;
-}
-
-/*
- * Constructs the destination with a specified physical name.
- *
- * @param   name the physical name for the destination.
- */
-ActiveMQDestination::ActiveMQDestination(const char* name)
-{
-    orderedTarget = new string(DEFAULT_ORDERED_TARGET) ;
-    physicalName  = new string(name) ;
-    exclusive     = false ;
-    ordered       = false ;
-    advisory      = ( name != NULL && (physicalName->find(ADVISORY_PREFIX) == 0)) ? true : false ;
-}
-
-/*
- *
- */
-ActiveMQDestination::~ActiveMQDestination()
-{
-    // no-op
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isAdvisory()
-{
-    return advisory ;
-}
-
-/*
- *
- */
-void ActiveMQDestination::setAdvisory(bool advisory)
-{
-    this->advisory = advisory ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isConsumerAdvisory()
-{
-    return ( isAdvisory() && (physicalName->find(CONSUMER_ADVISORY_PREFIX) == 0) ) ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isProducerAdvisory()
-{
-    return ( isAdvisory() && (physicalName->find(PRODUCER_ADVISORY_PREFIX) == 0) ) ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isConnectionAdvisory()
-{
-    return ( isAdvisory() && (physicalName->find(CONNECTION_ADVISORY_PREFIX) == 0) ) ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isExclusive()
-{
-    return exclusive ;
-}
-
-/*
- *
- */
-void ActiveMQDestination::setExclusive(bool exclusive)
-{
-    this->exclusive = exclusive ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isOrdered()
-{
-    return ordered ;
-}
-
-/*
- *
- */
-void ActiveMQDestination::setOrdered(bool ordered)
-{
-    this->ordered = ordered ;
-}
-
-/*
- *
- */
-p<string> ActiveMQDestination::getOrderedTarget()
-{
-    return orderedTarget ;
-}
-
-/*
- *
- */
-void ActiveMQDestination::setOrderedTarget(const char* target)
-{
-    this->orderedTarget->assign(target) ;
-}
-
-/*
- *
- */
-p<string> ActiveMQDestination::getPhysicalName()
-{
-    return physicalName ;
-}
-
-/*
- *
- */
-void ActiveMQDestination::setPhysicalName(const char* name)
-{
-    physicalName->assign(name) ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isTopic()
-{
-    return ( getDestinationType() == ACTIVEMQ_TOPIC ||
-             getDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC ) ; 
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isQueue()
-{
-    return !isTopic() ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isTemporary()
-{
-    return ( getDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC ||
-             getDestinationType() == ACTIVEMQ_TEMPORARY_QUEUE ) ; 
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isComposite()
-{
-    return ( physicalName->find(ActiveMQDestination::COMPOSITE_SEPARATOR) > 0 ) ;
-}
-
-/*
- *
- */
-bool ActiveMQDestination::isWildcard()
-{
-    if( physicalName != NULL )
-    {
-        return ( physicalName->find( DestinationFilter::ANY_CHILD ) >= 0 ||
-                 physicalName->find( DestinationFilter::ANY_DESCENDENT ) >= 0 ) ;
-    }
-    return false ;
-}
-
-/*
- *
- */
-p<string> ActiveMQDestination::toString()
-{
-    return physicalName ;
-}
-
-// --- Static methods ---------------------------------------------
-
-/*
- * A helper method to return a descriptive string for the topic or queue.
- */
-p<string> ActiveMQDestination::inspect(p<ActiveMQDestination> destination)
-{
-    p<string> description = new string() ;
-
-    if( typeid(*destination) == typeid(ITopic) )
-        description->assign("Topic(") ;
-    else
-        description->assign("Queue(") ;
-    
-    description->append( destination->toString()->c_str() ) ;
-    description->append(")") ;
-
-    return description ;
-}
-
-/*
- *
- */
-/*p<ActiveMQDestination> ActiveMQDestination::transform(p<IDestination> destination)
-{
-    p<ActiveMQDestination> result = NULL ;
-
-    if( destination != NULL )
-    {
-        if( typeid(*destination) == typeid(ActiveMQDestination) )
-            result = p_cast<ActiveMQDestination> (destination) ;
-
-        else
-        {
-            if( typeid(ITopic).before(typeid(IDestination)) )
-                result = new ActiveMQTopic( (p_cast<ITopic> (destination))->getTopicName()->c_str() ) ;
-
-            else if( typeid(*destination).before(typeid(IQueue)) )
-                result = new ActiveMQQueue( (p_cast<IQueue> (destination))->getQueueName()->c_str() ) ;
-
-            else if( typeid(ITemporaryQueue).before(typeid(*destination)) )
-                result = new ActiveMQTempQueue( (p_cast<IQueue> (destination))->getQueueName()->c_str() ) ;
-
-            else if( typeid(ITemporaryTopic).before(typeid(*destination)) )
-                result = new ActiveMQTempTopic( (p_cast<ITopic> (destination))->getTopicName()->c_str() ) ;
-
-        } 
-    }
-    return result ;
-}*/
-
-/*
- *
- */
-p<ActiveMQDestination> ActiveMQDestination::createDestination(int type, const char* physicalName)
-{
-    p<ActiveMQDestination> result = NULL ;
-
-    if( type == ActiveMQDestination::ACTIVEMQ_TOPIC )
-        result = new ActiveMQTopic(physicalName) ;
-
-    else if( type == ActiveMQDestination::ACTIVEMQ_TEMPORARY_TOPIC )
-        result = new ActiveMQTempTopic(physicalName) ;
-
-    else if (type == ActiveMQDestination::ACTIVEMQ_QUEUE)
-        result = new ActiveMQQueue(physicalName) ;
-
-    else
-        result = new ActiveMQTempQueue(physicalName) ;
-
-    return result ;
-}
-
-/*
- *
- */
-p<string> ActiveMQDestination::createTemporaryName(const char* clientId)
-{
-    p<string> tempName = new string() ;
-
-    tempName->assign( ActiveMQDestination::TEMP_PREFIX ) ;
-    tempName->append(clientId) ;
-    tempName->append( ActiveMQDestination::TEMP_POSTFIX ) ;
-
-    return tempName ;
-}
-
-/*
- *
- */
-p<string> ActiveMQDestination::getClientId(p<ActiveMQDestination> destination)
-{
-    p<string> answer = NULL ;
-
-    if( destination != NULL && destination->isTemporary() )
-    {
-        p<string> name = destination->getPhysicalName() ;
-        int      start = (int)name->find(TEMP_PREFIX),
-                 stop ;
-
-        if( start >= 0 )
-        {
-            start += (int)strlen(TEMP_PREFIX) ;
-            stop   = (int)name->find_last_of(TEMP_POSTFIX) ;
-
-            if( stop > start && stop < (int)name->length() )
-                answer->assign( name->substr(start, stop) ) ;
-        } 
-    }
-    return answer; 
-}
-
-/*
- *
- */
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/DestinationFilter.hpp"
+#include "activemq/command/ActiveMQDestination.hpp"
+#include "activemq/command/ActiveMQTempQueue.hpp"
+#include "activemq/command/ActiveMQTempTopic.hpp"
+#include "activemq/command/ActiveMQQueue.hpp"
+#include "activemq/command/ActiveMQTopic.hpp"
+
+using namespace apache::activemq::command;
+
+
+// --- Static initialization ----------------------------------------
+
+const char* ActiveMQDestination::ADVISORY_PREFIX            = "ActiveMQ.Advisory." ;
+const char* ActiveMQDestination::CONSUMER_ADVISORY_PREFIX   = "ActiveMQ.Advisory.Consumers." ;
+const char* ActiveMQDestination::PRODUCER_ADVISORY_PREFIX   = "ActiveMQ.Advisory.Producers." ;
+const char* ActiveMQDestination::CONNECTION_ADVISORY_PREFIX = "ActiveMQ.Advisory.Connections." ;
+const char* ActiveMQDestination::DEFAULT_ORDERED_TARGET     = "coordinator" ;
+
+const char* ActiveMQDestination::TEMP_PREFIX         = "{TD{" ;
+const char* ActiveMQDestination::TEMP_POSTFIX        = "}TD}" ;
+const char* ActiveMQDestination::COMPOSITE_SEPARATOR = "," ;
+const char* ActiveMQDestination::QUEUE_PREFIX        = "queue://" ;
+const char* ActiveMQDestination::TOPIC_PREFIX        = "topic://" ;
+
+
+/*
+ * Default constructor
+ */
+ActiveMQDestination::ActiveMQDestination()
+{
+    orderedTarget = new string(DEFAULT_ORDERED_TARGET) ;
+    physicalName  = new string("") ;
+    exclusive     = false ;
+    ordered       = false ;
+    advisory      = false ;
+}
+
+/*
+ * Constructs the destination with a specified physical name.
+ *
+ * @param   name the physical name for the destination.
+ */
+ActiveMQDestination::ActiveMQDestination(const char* name)
+{
+    orderedTarget = new string(DEFAULT_ORDERED_TARGET) ;
+    physicalName  = new string(name) ;
+    exclusive     = false ;
+    ordered       = false ;
+    advisory      = ( name != NULL && (physicalName->find(ADVISORY_PREFIX) == 0)) ? true : false ;
+}
+
+/*
+ *
+ */
+ActiveMQDestination::~ActiveMQDestination()
+{
+    // no-op
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isAdvisory()
+{
+    return advisory ;
+}
+
+/*
+ *
+ */
+void ActiveMQDestination::setAdvisory(bool advisory)
+{
+    this->advisory = advisory ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isConsumerAdvisory()
+{
+    return ( isAdvisory() && (physicalName->find(CONSUMER_ADVISORY_PREFIX) == 0) ) ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isProducerAdvisory()
+{
+    return ( isAdvisory() && (physicalName->find(PRODUCER_ADVISORY_PREFIX) == 0) ) ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isConnectionAdvisory()
+{
+    return ( isAdvisory() && (physicalName->find(CONNECTION_ADVISORY_PREFIX) == 0) ) ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isExclusive()
+{
+    return exclusive ;
+}
+
+/*
+ *
+ */
+void ActiveMQDestination::setExclusive(bool exclusive)
+{
+    this->exclusive = exclusive ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isOrdered()
+{
+    return ordered ;
+}
+
+/*
+ *
+ */
+void ActiveMQDestination::setOrdered(bool ordered)
+{
+    this->ordered = ordered ;
+}
+
+/*
+ *
+ */
+p<string> ActiveMQDestination::getOrderedTarget()
+{
+    return orderedTarget ;
+}
+
+/*
+ *
+ */
+void ActiveMQDestination::setOrderedTarget(const char* target)
+{
+    this->orderedTarget->assign(target) ;
+}
+
+/*
+ *
+ */
+p<string> ActiveMQDestination::getPhysicalName()
+{
+    return physicalName ;
+}
+
+/*
+ *
+ */
+void ActiveMQDestination::setPhysicalName(const char* name)
+{
+    physicalName->assign(name) ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isTopic()
+{
+    return ( getDestinationType() == ACTIVEMQ_TOPIC ||
+             getDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC ) ; 
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isQueue()
+{
+    return !isTopic() ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isTemporary()
+{
+    return ( getDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC ||
+             getDestinationType() == ACTIVEMQ_TEMPORARY_QUEUE ) ; 
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isComposite()
+{
+    return ( physicalName->find(ActiveMQDestination::COMPOSITE_SEPARATOR) > 0 ) ;
+}
+
+/*
+ *
+ */
+bool ActiveMQDestination::isWildcard()
+{
+    if( physicalName != NULL )
+    {
+        return ( physicalName->find( DestinationFilter::ANY_CHILD ) >= 0 ||
+                 physicalName->find( DestinationFilter::ANY_DESCENDENT ) >= 0 ) ;
+    }
+    return false ;
+}
+
+/*
+ *
+ */
+p<string> ActiveMQDestination::toString()
+{
+    return physicalName ;
+}
+
+// --- Static methods ---------------------------------------------
+
+/*
+ * A helper method to return a descriptive string for the topic or queue.
+ */
+p<string> ActiveMQDestination::inspect(p<ActiveMQDestination> destination)
+{
+    p<string> description = new string() ;
+
+    if( typeid(*destination) == typeid(ITopic) )
+        description->assign("Topic(") ;
+    else
+        description->assign("Queue(") ;
+    
+    description->append( destination->toString()->c_str() ) ;
+    description->append(")") ;
+
+    return description ;
+}
+
+/*
+ *
+ */
+/*p<ActiveMQDestination> ActiveMQDestination::transform(p<IDestination> destination)
+{
+    p<ActiveMQDestination> result = NULL ;
+
+    if( destination != NULL )
+    {
+        if( typeid(*destination) == typeid(ActiveMQDestination) )
+            result = p_cast<ActiveMQDestination> (destination) ;
+
+        else
+        {
+            if( typeid(ITopic).before(typeid(IDestination)) )
+                result = new ActiveMQTopic( (p_cast<ITopic> (destination))->getTopicName()->c_str() ) ;
+
+            else if( typeid(*destination).before(typeid(IQueue)) )
+                result = new ActiveMQQueue( (p_cast<IQueue> (destination))->getQueueName()->c_str() ) ;
+
+            else if( typeid(ITemporaryQueue).before(typeid(*destination)) )
+                result = new ActiveMQTempQueue( (p_cast<IQueue> (destination))->getQueueName()->c_str() ) ;
+
+            else if( typeid(ITemporaryTopic).before(typeid(*destination)) )
+                result = new ActiveMQTempTopic( (p_cast<ITopic> (destination))->getTopicName()->c_str() ) ;
+
+        } 
+    }
+    return result ;
+}*/
+
+/*
+ *
+ */
+p<ActiveMQDestination> ActiveMQDestination::createDestination(int type, const char* physicalName)
+{
+    p<ActiveMQDestination> result = NULL ;
+
+    if( type == ActiveMQDestination::ACTIVEMQ_TOPIC )
+        result = new ActiveMQTopic(physicalName) ;
+
+    else if( type == ActiveMQDestination::ACTIVEMQ_TEMPORARY_TOPIC )
+        result = new ActiveMQTempTopic(physicalName) ;
+
+    else if (type == ActiveMQDestination::ACTIVEMQ_QUEUE)
+        result = new ActiveMQQueue(physicalName) ;
+
+    else
+        result = new ActiveMQTempQueue(physicalName) ;
+
+    return result ;
+}
+
+/*
+ *
+ */
+p<string> ActiveMQDestination::createTemporaryName(const char* clientId)
+{
+    p<string> tempName = new string() ;
+
+    tempName->assign( ActiveMQDestination::TEMP_PREFIX ) ;
+    tempName->append(clientId) ;
+    tempName->append( ActiveMQDestination::TEMP_POSTFIX ) ;
+
+    return tempName ;
+}
+
+/*
+ *
+ */
+p<string> ActiveMQDestination::getClientId(p<ActiveMQDestination> destination)
+{
+    p<string> answer = NULL ;
+
+    if( destination != NULL && destination->isTemporary() )
+    {
+        p<string> name = destination->getPhysicalName() ;
+        int      start = (int)name->find(TEMP_PREFIX),
+                 stop ;
+
+        if( start >= 0 )
+        {
+            start += (int)strlen(TEMP_PREFIX) ;
+            stop   = (int)name->find_last_of(TEMP_POSTFIX) ;
+
+            if( stop > start && stop < (int)name->length() )
+                answer->assign( name->substr(start, stop) ) ;
+        } 
+    }
+    return answer; 
+}
+
+/*
+ *
+ */
 int ActiveMQDestination::marshal(p<IMarshaller> marshaller, int mode, p<IOutputStream> ostream) throw (IOException)
 {
     int size = 0 ;
@@ -355,9 +355,9 @@
     return size ;
 }
 
-/*
- *
- */
+/*
+ *
+ */
 void ActiveMQDestination::unmarshal(p<IMarshaller> marshaller, int mode, p<IInputStream> istream) throw (IOException)
 {
     physicalName = p_cast<string>(marshaller->unmarshalString(mode, istream)) ; 

Propchange: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/command/ActiveMQDestination.cpp
------------------------------------------------------------------------------
    svn:eol-style = native