You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2006/04/27 23:59:44 UTC

svn commit: r397654 [2/12] - in /incubator/activemq/trunk/openwire-cpp: ./ src/ src/command/ src/gram/ src/gram/java/ src/gram/java/org/ src/gram/java/org/apache/ src/gram/java/org/apache/activemq/ src/gram/java/org/apache/activemq/openwire/ src/gram/j...

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,382 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/Connection.hpp"
+#include "activemq/Session.hpp"
+
+using namespace apache::activemq;
+
+
+// --- Constructors -------------------------------------------------
+
+/*
+ *
+ */
+Connection::Connection(p<ITransport> transport, p<ConnectionInfo> connectionInfo)
+{
+    this->transport               = transport ;
+    this->connectionInfo          = connectionInfo ;
+    this->acknowledgementMode     = AutoAckMode ;
+    this->connected               = false ;
+    this->closed                  = false ;
+    this->brokerInfo              = NULL ;
+    this->brokerWireFormat        = NULL ;
+    this->sessionCounter          = 0 ;
+    this->tempDestinationCounter  = 0 ;
+    this->localTransactionCounter = 0 ;
+
+    // Hook up as a command listener and start dispatching
+    transport->setCommandListener(smartify(this)) ;
+    transport->start() ;
+}
+
+/*
+ *
+ */
+Connection::~Connection()
+{
+    // Make sure connection is closed
+    close() ;
+}
+
+
+// --- Attribute methods --------------------------------------------
+
+/*
+ *
+ */
+void Connection::setExceptionListener(p<IExceptionListener> listener)
+{
+    this->listener = listener ;
+}
+
+/*
+ *
+ */
+p<IExceptionListener> Connection::getExceptionListener()
+{
+    return listener ;
+}
+
+/*
+ *
+ */
+p<ITransport> Connection::getTransport()
+{
+    return transport ;
+}
+
+/*
+ *
+ */
+void Connection::setTransport(p<ITransport> transport)
+{
+    this->transport = transport ;
+}
+
+/*
+ *
+ */
+p<string> Connection::getClientId()
+{
+    return connectionInfo->getClientId() ;
+}
+
+/*
+ *
+ */
+void Connection::setClientId(const char* value) throw (CmsException)
+{
+    if( connected )
+        throw CmsException("You cannot change the client id once a connection is established") ; 
+
+    p<string> clientId = new string(value) ;
+    connectionInfo->setClientId( clientId ) ;
+}
+
+
+/*
+ *
+ */
+p<BrokerInfo> Connection::getBrokerInfo()
+{
+    return brokerInfo ;
+}
+
+/*
+ *
+ */
+p<WireFormatInfo> Connection::getBrokerWireFormat()
+{
+    return brokerWireFormat ;
+}
+
+/*
+ *
+ */
+AcknowledgementMode Connection::getAcknowledgementMode()
+{
+    return acknowledgementMode ;
+}
+
+/*
+ *
+ */
+void Connection::setAcknowledgementMode(AcknowledgementMode ackMode)
+{
+    acknowledgementMode = ackMode ;
+}
+
+/*
+ *
+ */
+p<ConnectionId> Connection::getConnectionId()
+{
+    return connectionInfo->getConnectionId() ;
+}
+
+
+// --- Operation methods --------------------------------------------
+
+/*
+ *
+ */
+void Connection::close()
+{
+    if( !closed )
+    {
+        list< p<ISession> >::iterator sessionIter ;
+
+        // Iterate through all sessions and close them down
+        for( sessionIter = sessions.begin() ;
+             sessionIter != sessions.end() ;
+             sessionIter++ )
+        {
+            (*sessionIter)->close() ;
+        }
+        // Empty session list
+        sessions.clear() ;
+
+        // Remove connection from broker
+        disposeOf( getConnectionId() ) ;
+
+        // Finally, transmit a shutdown command to broker
+        transport->oneway( new ShutdownInfo() ) ;
+        closed = true ;
+    }
+}
+
+/*
+ *
+ */
+p<ISession> Connection::createSession() throw(CmsException)
+{
+    return createSession(acknowledgementMode) ;
+}
+
+/*
+ *
+ */
+p<ISession> Connection::createSession(AcknowledgementMode ackMode) throw(CmsException)
+{
+    p<SessionInfo> sessionInfo = createSessionInfo( ackMode ) ;
+
+    // Send session info to broker
+    syncRequest(sessionInfo) ;
+
+    p<ISession> session = new Session(smartify(this), sessionInfo, ackMode) ;
+    sessions.push_back(session) ;
+
+    return session ;
+}
+
+/*
+ * Performs a synchronous request-response with the broker.
+ */
+p<Response> Connection::syncRequest(p<ICommand> command) throw(CmsException) 
+{
+    checkConnected() ;
+    
+    p<Response> response = transport->request(command) ;
+
+    if( response->getDataStructureType() == ExceptionResponse::TYPE )
+    {
+        p<ExceptionResponse> exceptionResponse = p_cast<ExceptionResponse> (response) ;
+        p<BrokerError>       brokerError = exceptionResponse->getException() ;
+        string               message ;
+
+        // Build error message
+        message.assign("Request failed: ") ;
+        message.append( brokerError->getExceptionClass()->c_str() ) ;
+        message.append(", ") ;
+        message.append( brokerError->getStackTrace()->c_str() ) ;
+
+        // TODO: Change to CMSException()
+        throw CmsException( message.c_str() ) ; 
+    }
+    return response ;
+}
+
+/*
+ *
+ */
+void Connection::oneway(p<ICommand> command) throw(CmsException)
+{
+    checkConnected() ;
+    transport->oneway(command) ;
+}
+
+/*
+ *
+ */
+void Connection::disposeOf(p<IDataStructure> dataStructure) throw(CmsException)
+{
+    p<RemoveInfo> command = new RemoveInfo() ;
+    command->setObjectId( dataStructure ) ;
+    syncRequest(command) ;
+}
+
+/*
+ * Creates a new temporary destination name.
+ */
+p<string> Connection::createTemporaryDestinationName()
+{
+    p<string> name = new string() ;
+    char*     buffer = new char[15] ;
+
+    {
+        LOCKED_SCOPE( mutex );
+
+        name->assign( connectionInfo->getConnectionId()->getValue()->c_str() ) ;
+        name->append( ":" ) ;
+#ifdef unix
+        sprintf(buffer, "%lld", ++tempDestinationCounter) ;
+#else
+        sprintf(buffer, "%I64d", ++tempDestinationCounter) ;
+#endif
+        name->append( buffer ) ;
+    }
+
+    return name ;
+}
+
+/*
+ * Creates a new local transaction ID.
+ */
+p<LocalTransactionId> Connection::createLocalTransactionId()
+{
+    p<LocalTransactionId> id = new LocalTransactionId() ;
+
+    id->setConnectionId( getConnectionId() ) ;
+
+    {
+        LOCKED_SCOPE (mutex);
+        id->setValue( ++localTransactionCounter ) ;
+    }
+
+    return id ;
+}
+
+
+// --- Implementation methods ---------------------------------------
+
+/*
+ *
+ */
+p<SessionInfo> Connection::createSessionInfo(AcknowledgementMode ackMode)
+{
+    p<SessionInfo> sessionInfo = new SessionInfo() ;
+    p<SessionId>   sessionId   = new SessionId() ;
+    
+    sessionId->setConnectionId ( connectionInfo->getConnectionId()->getValue() ) ;
+
+    {
+        LOCKED_SCOPE( mutex );
+        sessionId->setValue( ++sessionCounter ) ; 
+    }
+
+    sessionInfo->setSessionId( sessionId ) ;
+    return sessionInfo ; 
+}
+
+/*
+ *
+ */
+void Connection::checkConnected() throw(CmsException) 
+{
+    if( closed )
+        throw ConnectionClosedException("Oops! Connection already closed.") ;
+
+    if( !connected )
+    {
+        connected = true ;
+
+        // Send the connection info and see if we get an ack/nak
+        syncRequest( connectionInfo ) ;
+    } 
+}
+
+/*
+ * Handle incoming commands.
+ */
+void Connection::onCommand(p<ITransport> transport, p<ICommand> command)
+{
+    if( command->getDataStructureType() == MessageDispatch::TYPE )
+    {
+        p<MessageDispatch> dispatch = p_cast<MessageDispatch> (command) ;
+        p<ConsumerId> consumerId = dispatch->getConsumerId() ;
+        p<MessageConsumer> consumer = NULL ;
+        list< p<ISession> >::const_iterator tempIter ;
+
+        // Iterate through all sessions and check if one has the consumer
+        for( tempIter = sessions.begin() ;
+             tempIter != sessions.end() ;
+             tempIter++ )
+        {
+            consumer = p_cast<Session> (*tempIter)->getConsumer(consumerId) ;
+
+            // Found a match?
+            if( consumer != NULL )
+                break ;
+        }
+        if( consumer == NULL )
+            cout << "ERROR: No such consumer active: " << consumerId->getValue() << endl ;
+        else
+        {
+            p<ActiveMQMessage> message = p_cast<ActiveMQMessage> (dispatch->getMessage()) ;
+            consumer->dispatch(message) ;
+        }
+    }
+    else if( command->getDataStructureType() == WireFormatInfo::TYPE )
+        this->brokerWireFormat = p_cast<WireFormatInfo> (command) ;
+
+    else if( command->getDataStructureType() == BrokerInfo::TYPE )
+        this->brokerInfo = p_cast<BrokerInfo> (command) ;
+
+    else
+        cout << "ERROR: Unknown command: " << command->getDataStructureType() << endl ;
+}
+
+/*
+ * Handle incoming broker errors.
+ */
+void Connection::onError(p<ITransport> transport, exception& error)
+{
+    if( listener != NULL )
+        this->listener->onException(error) ;
+    else
+        cout << "ERROR: Received a broker exception: " << error.what() << endl ;
+}

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_Connection_hpp_
+#define ActiveMQ_Connection_hpp_
+
+#include <iostream>
+#include <list>
+#include <string>
+#include <map>
+
+#include "cms/ISession.hpp"
+#include "cms/IConnection.hpp"
+#include "cms/IExceptionListener.hpp"
+#include "cms/CmsException.hpp"
+#include "activemq/ConnectionClosedException.hpp"
+#include "activemq/MessageConsumer.hpp"
+#include "activemq/command/BrokerInfo.hpp"
+#include "activemq/command/WireFormatInfo.hpp"
+#include "activemq/command/ExceptionResponse.hpp"
+#include "activemq/command/ConnectionInfo.hpp"
+#include "activemq/command/LocalTransactionId.hpp"
+#include "activemq/command/MessageDispatch.hpp"
+#include "activemq/command/SessionInfo.hpp"
+#include "activemq/command/SessionId.hpp"
+#include "activemq/command/ShutdownInfo.hpp"
+#include "activemq/transport/ITransport.hpp"
+#include "activemq/transport/ICommandListener.hpp"
+#include "ppr/thread/SimpleMutex.hpp"
+#include "ppr/util/ifr/p"
+
+// Turn off warning message for ignored exception specification
+#ifdef _MSC_VER
+#pragma warning( disable : 4290 )
+#endif
+
+namespace apache
+{
+  namespace activemq
+  {
+    using namespace std;
+    using namespace ifr;
+    using namespace apache::cms;
+    using namespace apache::activemq::command;
+    using namespace apache::activemq::transport;
+    using namespace apache::ppr::thread;
+    using namespace apache::ppr::util;
+
+/*
+ * 
+ */
+class Connection : public IConnection, public ICommandListener
+{
+private:
+    p<ConnectionInfo>     connectionInfo ;
+    p<ITransport>         transport ;
+    p<BrokerInfo>         brokerInfo ;        // from MQ broker
+    p<WireFormatInfo>     brokerWireFormat ;  // from MQ broker
+    p<IExceptionListener> listener ;
+    list< p<ISession> >   sessions ;
+    bool                  connected,
+                          closed ;
+    AcknowledgementMode   acknowledgementMode ;
+    long long             sessionCounter,
+                          tempDestinationCounter,
+                          localTransactionCounter ;
+    SimpleMutex           mutex ;
+
+public:
+    // Constructors
+    Connection(p<ITransport> transport, p<ConnectionInfo> connectionInfo) ;
+    virtual ~Connection() ;
+
+    // Attribute methods
+    virtual void setExceptionListener(p<IExceptionListener> listener) ;
+    virtual p<IExceptionListener> getExceptionListener() ;
+    virtual p<ITransport> getTransport() ;
+    virtual void setTransport(p<ITransport> transport) ;
+    virtual p<string> getClientId() ;
+    virtual void setClientId(const char* value) throw (CmsException) ;
+    virtual p<BrokerInfo> getBrokerInfo() ;
+    virtual p<WireFormatInfo> getBrokerWireFormat() ;
+    virtual AcknowledgementMode getAcknowledgementMode() ;
+    virtual void setAcknowledgementMode(AcknowledgementMode mode) ;
+//    virtual void addConsumer(p<ConsumerId> consumerId, p<MessageConsumer> messageConsumer) ;
+//    virtual void removeConsumer(p<ConsumerId> consumerId) ;
+    virtual p<ConnectionId> getConnectionId() ;
+
+    // Operation methods
+    virtual p<ISession> createSession() throw(CmsException) ;
+    virtual p<ISession> createSession(AcknowledgementMode mode) throw(CmsException) ;
+    virtual p<Response> syncRequest(p<ICommand> command) throw(CmsException) ;
+    virtual void oneway(p<ICommand> command) throw(CmsException) ;
+    virtual void disposeOf(p<IDataStructure> dataStructure) throw(CmsException) ;
+    virtual p<string> createTemporaryDestinationName() ;
+    virtual p<LocalTransactionId> createLocalTransactionId() ;
+    virtual void close() ;
+
+protected:
+    // Implementation methods
+    p<SessionInfo> createSessionInfo(AcknowledgementMode mode) ;
+    void checkConnected() throw(CmsException) ;
+    void onCommand(p<ITransport> transport, p<ICommand> command) ;
+    void onError(p<ITransport> transport, exception& error) ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_Connection_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/ConnectionClosedException.hpp"
+
+using namespace apache::activemq;
+
+/*
+ * 
+ */
+ConnectionClosedException::ConnectionClosedException(const char* message)
+    : CmsException(message)
+{
+    // no-op
+}
+

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_ConnectionClosedException_hpp_
+#define ActiveMQ_ConnectionClosedException_hpp_
+
+#include "cms/CmsException.hpp"
+
+namespace apache
+{
+  namespace activemq
+  {
+    using namespace apache::cms;
+
+/*
+ * Signals that a connection is being used when it is already closed.
+ */
+class ConnectionClosedException : public CmsException
+{
+public:
+    ConnectionClosedException(const char* message) ;
+};
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_ConnectionClosedException_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionException.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionException.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionException.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionException.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_ConnectionException_hpp_
+#define ActiveMQ_ConnectionException_hpp_
+
+#include "ppr/TraceException.hpp"
+
+namespace apache
+{
+  namespace activemq
+  {
+    using namespace std ;
+    using namespace apache::ppr;
+
+/*
+ * Signals that a connection error has occured.
+ */
+class ConnectionException : public TraceException
+{
+public:
+    ConnectionException() : TraceException()
+       { /* no-op */ } ;
+    ConnectionException(const char *const& msg) : TraceException(msg)
+       { /* no-op */ } ;
+    ConnectionException(const char* fileName, int lineNo, const char* msg) : TraceException(msg)
+       { /* no-op */ } ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_ConnectionException_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/ConnectionFactory.hpp"
+#include "activemq/Connection.hpp"
+#include "activemq/protocol/openwire/OpenWireProtocol.hpp"
+#include "activemq/transport/tcp/TcpTransport.hpp"
+
+using namespace apache::activemq;
+
+/*
+ *
+ */
+ConnectionFactory::ConnectionFactory()
+{
+    // Use default URI
+    brokerUri        = new Uri ("tcp://localhost:61616?wireFormat=openwire") ;
+    username         = NULL ;
+    password         = NULL ;
+    clientId         = Guid::getGuidString() ;
+    transportFactory = new TransportFactory() ;
+}
+
+/*
+ *
+ */
+ConnectionFactory::ConnectionFactory(p<Uri> brokerUri)
+{
+    this->brokerUri  = brokerUri;
+    username         = NULL ;
+    password         = NULL ;
+    clientId         = Guid::getGuidString() ;
+    transportFactory = new TransportFactory() ;
+}
+
+
+// --- Attribute methods --------------------------------------------
+
+/*
+ *
+ */
+p<Uri> ConnectionFactory::getBrokerUri()
+{
+     return brokerUri ;
+}
+
+/*
+ *
+ */
+void ConnectionFactory::setBrokerUri(p<Uri> brokerUri)
+{
+    this->brokerUri = brokerUri ;
+}
+
+/*
+ *
+ */
+p<string> ConnectionFactory::getUsername()
+{
+    return username ;
+}
+
+/*
+ *
+ */
+void ConnectionFactory::setUsername(const char* username)
+{
+    this->username = new string(username) ;
+}
+
+/*
+ *
+ */
+p<string> ConnectionFactory::getPassword()
+{
+    return password ;
+}
+
+/*
+ *
+ */
+void ConnectionFactory::setPassword(const char* password)
+{
+    this->password = new string(password);
+
+}
+
+/*
+ *
+ */
+p<string> ConnectionFactory::getClientId()
+{
+    return clientId ;
+}
+
+/*
+ *
+ */
+void ConnectionFactory::setClientId(const char* clientId)
+{
+    this->clientId = new string(clientId) ;
+}
+
+
+// --- Operation methods --------------------------------------------
+
+/*
+ *
+ */
+p<IConnection> ConnectionFactory::createConnection() throw (ConnectionException)
+{
+    return createConnection( (username != NULL) ? username->c_str() : NULL,
+                             (password != NULL) ? password->c_str() : NULL ) ;
+}
+
+/*
+ *
+ */
+p<IConnection> ConnectionFactory::createConnection(const char* username, const char* password) throw (ConnectionException)
+{
+    p<ConnectionInfo> connectionInfo ;
+    p<ITransport>     transport ;
+    p<Connection>     connection ;
+
+
+    // Set up a new connection object
+    connectionInfo = createConnectionInfo(username, password) ;
+    transport      = createTransport() ;
+    connection     = new Connection(transport, connectionInfo) ;
+    connection->setClientId( clientId->c_str() ) ;
+
+    return connection ;
+}
+
+
+// --- Implementation methods ---------------------------------------
+
+/*
+ *
+ */
+p<ConnectionInfo> ConnectionFactory::createConnectionInfo(const char* username, const char* password)
+{
+    p<ConnectionInfo> connectionInfo = new ConnectionInfo() ;
+    p<ConnectionId>   connectionId   = new ConnectionId() ;
+    p<string>         uid = (username != NULL) ? new string(username) : NULL ;
+    p<string>         pwd = (password != NULL) ? new string(password) : NULL ;
+
+    connectionId->setValue( Guid::getGuidString() ) ;
+    connectionInfo->setConnectionId( connectionId ) ;
+    connectionInfo->setUserName( uid ) ;
+    connectionInfo->setPassword( pwd ) ;
+    connectionInfo->setClientId( clientId ) ;
+
+    return connectionInfo ;
+}
+
+/*
+ *
+ */
+p<ITransport> ConnectionFactory::createTransport() throw (ConnectionException)
+{
+    try
+    {
+    	// Create a transport for given URI
+        return transportFactory->createTransport( brokerUri ) ;
+    }
+    catch( SocketException se )
+    {
+        throw ConnectionException(__FILE__, __LINE__, "Failed to connect socket") ;
+    }
+}

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_ConnectionFactory_hpp_
+#define ActiveMQ_ConnectionFactory_hpp_
+
+// Must be included before any STL includes
+#include "ppr/util/Guid.hpp"
+
+#include <string>
+#include "cms/IConnection.hpp"
+#include "cms/IConnectionFactory.hpp"
+#include "activemq/ConnectionException.hpp"
+#include "activemq/command/ConnectionInfo.hpp"
+#include "activemq/command/ConnectionId.hpp"
+#include "activemq/protocol/IProtocol.hpp"
+#include "activemq/transport/ITransport.hpp"
+#include "activemq/transport/ITransportFactory.hpp"
+#include "activemq/transport/TransportFactory.hpp"
+#include "ppr/net/Uri.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace activemq
+  {
+      using namespace apache::activemq::command;
+      using namespace apache::activemq::protocol;
+      using namespace apache::activemq::transport;
+      using namespace apache::ppr::net;
+      using namespace ifr;
+
+/*
+ * 
+ */
+class ConnectionFactory : public IConnectionFactory
+{
+private:
+    p<Uri>               brokerUri ;
+    p<string>            username ;
+    p<string>            password ;
+    p<string>            clientId ;
+    p<IProtocol>         protocol ;
+    p<ITransportFactory> transportFactory ;
+
+public:
+    // Constructors
+    ConnectionFactory() ;
+    ConnectionFactory(p<Uri> uri) ;
+
+    // Attribute methods
+    virtual p<Uri> getBrokerUri() ;
+    virtual void setBrokerUri(p<Uri> brokerUri) ;
+    virtual p<string> getUsername() ;
+    virtual void setUsername(const char* username) ;
+    virtual p<string> getPassword() ;
+    virtual void setPassword(const char* password) ;
+    virtual p<string> getClientId() ;
+    virtual void setClientId(const char* clientId) ;
+
+    // Operation methods
+    virtual p<IConnection> createConnection() throw (ConnectionException) ;
+    virtual p<IConnection> createConnection(const char* username, const char* password) throw (ConnectionException) ;
+
+protected:
+    // Implementation methods
+    virtual p<ConnectionInfo> createConnectionInfo(const char* username, const char* password) ;
+    virtual p<ITransport> createTransport() throw (ConnectionException) ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_ConnectionFactory_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/ConsumerClosedException.hpp"
+
+using namespace apache::activemq;
+
+/*
+ * 
+ */
+ConsumerClosedException::ConsumerClosedException(const char* message)
+    : CmsException(message)
+{
+    // no-op
+}
+

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_ConsumerClosedException_hpp_
+#define ActiveMQ_ConsumerClosedException_hpp_
+
+#include "cms/CmsException.hpp"
+
+namespace apache
+{
+  namespace activemq
+  {
+      using namespace apache::cms;
+
+/*
+ * Signals that a consumer is being used when it is already closed.
+ */
+class ConsumerClosedException : public CmsException
+{
+public:
+    ConsumerClosedException(const char* message) ;
+};
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_ConsumerClosedException_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/DestinationFilter.hpp"
+
+using namespace apache::activemq;
+
+// Init static constants
+const char* DestinationFilter::ANY_DESCENDENT = ">" ;
+const char* DestinationFilter::ANY_CHILD      = "*" ;
+
+/*
+ * 
+ */
+DestinationFilter::DestinationFilter()
+{
+    // no-op
+}
+
+/*
+ * 
+ */
+DestinationFilter::~DestinationFilter()
+{
+    // no-op
+}
+
+/*
+ * 
+ */
+bool DestinationFilter::matches(p<ActiveMQMessage> message)
+{
+    return matches( message->getDestination() ) ; 
+}

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_DestinationFilter_hpp_
+#define ActiveMQ_DestinationFilter_hpp_
+
+#include "activemq/command/ActiveMQMessage.hpp"
+#include "activemq/command/ActiveMQDestination.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace activemq
+  {
+    using namespace ifr;
+    using namespace apache::activemq::command;
+
+/*
+ * 
+ */
+class DestinationFilter
+{
+public:
+    const static char* ANY_DESCENDENT ;
+    const static char* ANY_CHILD ;
+
+public:
+    DestinationFilter() ;
+    virtual ~DestinationFilter() ;
+
+    virtual bool matches(p<ActiveMQMessage> message) ;
+    virtual bool matches(p<ActiveMQDestination> destination) = 0 ;
+};
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_DestinationFilter_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/Dispatcher.hpp"
+
+using namespace apache::activemq;
+
+/*
+ * 
+ */
+Dispatcher::Dispatcher()
+{
+    dispatchQueue = new queue< p<IMessage> > ;
+    redeliverList = new list< p<IMessage> > ;
+}
+
+/*
+ * 
+ */
+void Dispatcher::redeliverRolledBackMessages()
+{
+    LOCKED_SCOPE (mutex);
+
+    p<queue< p<IMessage> > > replacementQueue = new queue< p<IMessage> > ;
+    //(dispatchQueue->size() + redeliverList->size() ) ;
+
+    // Copy all messages to be redelivered to the new queue
+    while( !redeliverList->empty() )
+    {
+        replacementQueue->push( redeliverList->front() ) ;
+        redeliverList->pop_front() ;
+    }
+
+    // Copy all messages to be dispatched to the new queue
+    while( dispatchQueue->size() > 0 )
+    {
+        // Get first element in queue
+        p<IMessage> element = p_cast<IMessage> (dispatchQueue->front()) ;
+
+        // Remove first element from queue
+        dispatchQueue->pop() ;
+
+        // Add element to the new queue
+        replacementQueue->push(element) ;
+    }
+    // Switch to the new queue
+    dispatchQueue = replacementQueue ;
+
+    semaphore.notify() ;
+}
+
+/*
+ * 
+ */
+void Dispatcher::redeliver(p<IMessage> message)
+{
+    LOCKED_SCOPE (mutex);
+    redeliverList->push_back(message) ;
+}
+
+/*
+ * 
+ */
+void Dispatcher::enqueue(p<IMessage> message)
+{
+    LOCKED_SCOPE (mutex);
+    dispatchQueue->push(message) ;
+    semaphore.notify() ;
+}
+
+/*
+ * 
+ */
+p<IMessage> Dispatcher::dequeueNoWait()
+{
+    p<IMessage> msg = NULL ;
+
+    {
+        LOCKED_SCOPE (mutex);
+
+        if( dispatchQueue->size() > 0 )
+        {
+            msg = p_cast<IMessage> (dispatchQueue->front()) ;
+            dispatchQueue->pop() ;
+        }
+    }        
+    return msg ;
+}
+
+/*
+ * 
+ */
+p<IMessage> Dispatcher::dequeue(int timeout)
+{
+    p<IMessage> msg = NULL ;
+
+    {
+        LOCKED_SCOPE (mutex);
+
+        if( dispatchQueue->size() == 0 )
+            semaphore.wait(timeout) ;
+
+        if( dispatchQueue->size() > 0 )
+        {
+            msg = p_cast<IMessage> (dispatchQueue->front()) ;
+            dispatchQueue->pop() ;
+        }
+    }
+    return msg ;
+}
+
+/*
+ * 
+ */
+p<IMessage> Dispatcher::dequeue()
+{
+    p<IMessage> msg = NULL ;
+
+    {
+        LOCKED_SCOPE (mutex);
+
+        msg = p_cast<IMessage> (dispatchQueue->front()) ;
+        dispatchQueue->pop() ;
+    }
+
+    return msg ;
+}

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_Dispatcher_hpp_
+#define ActiveMQ_Dispatcher_hpp_
+
+#include <string>
+#include <list>
+#include <queue>
+#include "cms/IMessage.hpp"
+#include "activemq/command/Response.hpp"
+#include "ppr/thread/SimpleMutex.hpp"
+#include "ppr/thread/Semaphore.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace activemq
+  {
+    using namespace ifr;
+    using namespace apache::activemq::command;
+    using namespace apache::cms;
+    using namespace apache::ppr::thread;
+
+/*
+ * Handles the multi-threaded dispatching between transport and consumers.
+ */
+class Dispatcher
+{
+private:
+    p<queue< p<IMessage> > > dispatchQueue ;
+    p<list< p<IMessage> > >  redeliverList ;
+    SimpleMutex              mutex ;
+    Semaphore                semaphore ;
+
+public:
+    Dispatcher() ;
+    virtual ~Dispatcher() {}
+
+    virtual void redeliverRolledBackMessages() ;
+    virtual void redeliver(p<IMessage> message) ;
+    virtual void enqueue(p<IMessage> message) ;
+    virtual p<IMessage> dequeueNoWait() ;
+    virtual p<IMessage> dequeue(int timeout) ;
+    virtual p<IMessage> dequeue() ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_Dispatcher_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IAcknowledger.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IAcknowledger.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IAcknowledger.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IAcknowledger.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_IAcknowledger_hpp_
+#define ActiveMQ_IAcknowledger_hpp_
+
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace activemq
+  {
+    namespace command
+    {
+        class ActiveMQMessage ;
+    }
+    using namespace ifr;
+    using namespace apache::activemq::command;
+
+/*
+ * Interface for message acknowledgers.
+ */
+struct IAcknowledger : Interface
+{
+    virtual void acknowledge(p<ActiveMQMessage> message) = 0 ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_IAcknowledger_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ICommand.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ICommand.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ICommand.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ICommand.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_ICommand_hpp_
+#define ActiveMQ_ICommand_hpp_
+
+#include "activemq/IDataStructure.hpp"
+
+namespace apache
+{
+  namespace activemq
+  {
+
+/*
+ * An OpenWire command
+ */
+struct ICommand : IDataStructure
+{
+    virtual int getCommandId() = 0;
+    virtual void setCommandId(int value) = 0;
+    virtual bool getResponseRequired() = 0;
+    virtual void setResponseRequired(bool value) = 0;
+};
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_ICommand_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IDataStructure.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IDataStructure.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IDataStructure.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IDataStructure.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_IDataStructure_hpp_
+#define ActiveMQ_IDataStructure_hpp_
+
+#include "ppr/io/IOutputStream.hpp"
+#include "ppr/io/IInputStream.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace activemq
+  {
+    namespace protocol
+    {
+      struct IMarshaller ;
+    }
+    using namespace ifr;
+    using namespace apache::activemq::protocol;
+    using namespace apache::ppr::io;
+
+/*
+ * An OpenWire data structure.
+ */
+struct IDataStructure : Interface
+{
+    virtual unsigned char getDataStructureType() = 0 ;
+    virtual bool isMarshallAware() = 0 ;
+    virtual int marshal(p<IMarshaller> marshaller, int mode, p<IOutputStream> writer) = 0 ;
+    virtual void unmarshal(p<IMarshaller> marshaller, int mode, p<IInputStream> reader) = 0 ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_IDataStructure_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ISynchronization.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ISynchronization.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ISynchronization.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ISynchronization.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_ISynchronization_hpp_
+#define ActiveMQ_ISynchronization_hpp_
+
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace activemq
+  {
+    using namespace ifr;
+
+/*
+ * 
+ */
+struct ISynchronization : Interface
+{
+    // Called before a commit
+    virtual void beforeCommit() = 0 ;
+    
+    // Called after a commit
+    virtual void afterCommit() = 0 ;
+    
+    // Called after a transaction rollback
+    virtual void afterRollback() = 0 ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_ISynchronization_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageAckType.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageAckType.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageAckType.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageAckType.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_MessageAckType_hpp_
+#define ActiveMQ_MessageAckType_hpp_
+
+namespace apache
+{
+  namespace activemq
+  {
+    enum MessageAckType
+    {
+      DeliveredAck = 0,
+      PoisonAck    = 1,
+      ConsumedAck  = 2
+    } ;
+  }
+}
+
+#endif /*ActiveMQ_MessageAckType_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,307 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/MessageConsumer.hpp"
+#include "activemq/Session.hpp"
+
+using namespace apache::activemq;
+
+/*
+ * 
+ */
+MessageConsumer::MessageConsumer(p<Session> session, p<ConsumerInfo> consumerInfo, AcknowledgementMode acknowledgementMode)
+{
+    this->session                = session ;
+    this->consumerInfo           = consumerInfo ;
+    this->acknowledgementMode    = acknowledgementMode ;
+    this->dispatcher             = new Dispatcher() ;
+    this->listener               = NULL ;
+    this->closed                 = false ;
+    this->maximumRedeliveryCount = 10 ;
+    this->redeliveryTimeout      = 500 ;
+}
+
+/*
+ *
+ */
+MessageConsumer::~MessageConsumer()
+{
+    // Make sure consumer is closed
+    close() ;
+}
+
+// Attribute methods ------------------------------------------------
+
+/*
+ *
+ */
+void MessageConsumer::setMessageListener(p<IMessageListener> listener)
+{
+    this->listener = listener ;
+}
+
+/*
+ *
+ */
+p<IMessageListener> MessageConsumer::getMessageListener()
+{
+    return listener ;
+}
+
+/*
+ *
+ */
+p<ConsumerId> MessageConsumer::getConsumerId()
+{
+    return consumerInfo->getConsumerId() ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::setMaximumRedeliveryCount(int count)
+{
+    this->maximumRedeliveryCount = count ;
+}
+
+/*
+ *
+ */
+int MessageConsumer::getMaximumRedeliveryCount()
+{
+    return maximumRedeliveryCount ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::setRedeliveryTimeout(int timeout)
+{
+    this->redeliveryTimeout = timeout ;
+}
+
+/*
+ *
+ */
+int MessageConsumer::getRedeliveryTimeout()
+{
+    return redeliveryTimeout ;
+}
+
+
+// Operation methods ------------------------------------------------
+
+/*
+ *
+ */
+p<IMessage> MessageConsumer::receive()
+{
+    checkClosed() ;
+    return autoAcknowledge( dispatcher->dequeue() ) ;
+}
+
+/*
+ *
+ */
+p<IMessage> MessageConsumer::receive(int timeout)
+{
+    checkClosed() ;
+    return autoAcknowledge( dispatcher->dequeue(timeout) ) ;
+}
+
+/*
+ *
+ */
+p<IMessage> MessageConsumer::receiveNoWait()
+{
+    checkClosed() ;
+    return autoAcknowledge( dispatcher->dequeueNoWait() ) ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::redeliverRolledBackMessages()
+{
+    dispatcher->redeliverRolledBackMessages() ;
+}
+
+/*
+ * Transport callback that handles messages dispatching
+ */
+void MessageConsumer::dispatch(p<IMessage> message)
+{
+    dispatcher->enqueue(message) ;
+
+    // Activate background dispatch thread if async listener is set up
+    if( listener != NULL )
+        session->dispatch() ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::dispatchAsyncMessages()
+{
+    while( listener != NULL )
+    {
+        p<IMessage> message = dispatcher->dequeueNoWait() ;
+
+        if( message != NULL )
+        {
+            listener->onMessage(message) ;
+
+            // Auto acknowledge message if selected
+            autoAcknowledge(message) ;
+        }
+        else
+            break ;
+    }
+}
+
+/*
+ * IAcknowledger callback method.
+ */
+void MessageConsumer::acknowledge(p<ActiveMQMessage> message)
+{
+    doClientAcknowledge(message) ;
+}
+
+/*
+ * 
+ */
+void MessageConsumer::close()
+{
+    if( !closed )
+    {
+        closed = true ;
+    
+        // De-register consumer from broker
+        session->getConnection()->disposeOf( consumerInfo->getConsumerId() ) ;
+
+        // Reset internal state (prevent cyclic references)
+        session = NULL ;
+    }
+}
+
+
+// Implementation methods ------------------------------------------------
+
+/*
+ *
+ */
+void MessageConsumer::checkClosed() throw(CmsException) 
+{
+    if( closed )
+        throw ConnectionClosedException("Oops! Connection already closed") ;
+}
+
+/*
+ *
+ */
+p<IMessage> MessageConsumer::autoAcknowledge(p<IMessage> message)
+{
+    try
+    {
+        // Is the message an ActiveMQMessage? (throws bad_cast otherwise)
+        p<ActiveMQMessage> activeMessage = p_dyncast<ActiveMQMessage> (message) ;
+
+        // Register the handler for client acknowledgment
+        activeMessage->setAcknowledger( smartify(this) ) ;
+
+        if( acknowledgementMode != ClientAckMode )
+            doAcknowledge(activeMessage) ;
+    }
+    catch( bad_cast& bc )
+    {
+        // ignore
+    }
+    return message ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::doClientAcknowledge(p<ActiveMQMessage> message)
+{
+    if( acknowledgementMode == ClientAckMode )
+        doAcknowledge(message);
+}
+
+/*
+ *
+ */
+void MessageConsumer::doAcknowledge(p<Message> message)
+{
+    p<MessageAck> ack = createMessageAck(message) ;
+    //cout << "Sending Ack: " << ack->getAckType() << endl ;
+    session->getConnection()->syncRequest(ack) ;
+}
+
+/*
+ *
+ */
+p<MessageAck> MessageConsumer::createMessageAck(p<Message> message)
+{
+    p<MessageAck> ack = new MessageAck() ;
+
+    // Set ack properties
+    ack->setAckType( ConsumedAck ) ;
+    ack->setConsumerId( consumerInfo->getConsumerId() ) ;
+    ack->setDestination( message->getDestination() ) ;
+    ack->setFirstMessageId( message->getMessageId() ) ;
+    ack->setLastMessageId( message->getMessageId() ) ;
+    ack->setMessageCount( 1 ) ;
+    
+    if( session->isTransacted() )
+    {
+        session->doStartTransaction() ;
+        ack->setTransactionId( session->getTransactionContext()->getTransactionId() ) ;
+        session->getTransactionContext()->addSynchronization( new MessageConsumerSynchronization(smartify(this), message) ) ;
+    }
+    return ack ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::afterRollback(p<ActiveMQMessage> message)
+{
+    // Try redeliver of the message again
+    message->setRedeliveryCounter( message->getRedeliveryCounter() + 1 ) ;
+
+    // Check if redeliver count has exceeded maximum
+    if( message->getRedeliveryCounter() > maximumRedeliveryCount )
+    {
+        // Send back a poisoned pill
+        p<MessageAck> ack = new MessageAck() ;
+        ack->setAckType( PoisonAck ) ;
+        ack->setConsumerId( consumerInfo->getConsumerId() ) ;
+        ack->setDestination( message->getDestination() ) ;
+        ack->setFirstMessageId( message->getMessageId() ) ;
+        ack->setLastMessageId( message->getMessageId() ) ;
+        ack->setMessageCount( 1 ) ;
+        session->getConnection()->oneway(ack) ;
+    }
+    else
+    {
+        dispatcher->redeliver(message) ;
+        
+        // Re-dispatch the message at some point in the future
+        if( listener != NULL )
+            session->dispatch( redeliveryTimeout ) ;
+    }
+}

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_MessageConsumer_hpp_
+#define ActiveMQ_MessageConsumer_hpp_
+
+#include <string>
+#include "cms/IDestination.hpp"
+#include "cms/IMessage.hpp"
+#include "cms/IMessageConsumer.hpp"
+#include "cms/IMessageListener.hpp"
+#include "cms/CmsException.hpp"
+#include "activemq/AcknowledgementMode.hpp"
+#include "activemq/MessageAckType.hpp"
+#include "activemq/Dispatcher.hpp"
+#include "activemq/IAcknowledger.hpp"
+#include "activemq/MessageConsumerSynchronization.hpp"
+#include "activemq/ConnectionClosedException.hpp"
+#include "activemq/command/ConsumerInfo.hpp"
+#include "activemq/command/Message.hpp"
+#include "activemq/command/ActiveMQMessage.hpp"
+#include "activemq/command/MessageAck.hpp"
+#include "ppr/util/ifr/p"
+#include "ppr/thread/Thread.hpp"
+
+// Turn off warning message for ignored exception specification
+#ifdef _MSC_VER
+#pragma warning( disable : 4290 )
+#endif
+
+namespace apache
+{
+  namespace activemq
+  {
+      using namespace ifr;
+      using namespace apache::cms;
+      using namespace apache::ppr::thread;
+      class Session ;
+
+/*
+ * 
+ */
+class MessageConsumer : public IMessageConsumer, public IAcknowledger
+{
+private:
+    p<Session>          session ;
+    p<ConsumerInfo>     consumerInfo ;
+    p<Dispatcher>       dispatcher ;
+    p<IMessageListener> listener ;
+    AcknowledgementMode acknowledgementMode ;
+    bool                closed ;
+    int                 maximumRedeliveryCount,
+                        redeliveryTimeout ;
+
+public:
+    MessageConsumer(p<Session> session, p<ConsumerInfo> consumerInfo, AcknowledgementMode acknowledgementMode) ;
+    virtual ~MessageConsumer() ;
+
+    virtual void setMessageListener(p<IMessageListener> listener) ;
+    virtual p<IMessageListener> getMessageListener() ;
+    virtual p<ConsumerId> getConsumerId() ;
+    virtual void setMaximumRedeliveryCount(int count) ;
+    virtual int getMaximumRedeliveryCount() ;
+    virtual void setRedeliveryTimeout(int timeout) ;
+    virtual int getRedeliveryTimeout() ;
+
+    virtual p<IMessage> receive() ;
+    virtual p<IMessage> receive(int timeout) ;
+    virtual p<IMessage> receiveNoWait() ;
+    virtual void redeliverRolledBackMessages() ;
+    virtual void dispatch(p<IMessage> message) ;
+    virtual void dispatchAsyncMessages() ;
+    virtual void afterRollback(p<ActiveMQMessage> message) ;
+    virtual void acknowledge(p<ActiveMQMessage> message) ;
+    virtual void close() ;
+
+protected:
+    void checkClosed() throw(CmsException) ;
+    p<IMessage> autoAcknowledge(p<IMessage> message) ;
+    void doClientAcknowledge(p<ActiveMQMessage> message) ;
+    void doAcknowledge(p<Message> message) ;
+    p<MessageAck> createMessageAck(p<Message> message) ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_IMessageConsumer_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/MessageConsumerSynchronization.hpp"
+#include "activemq/MessageConsumer.hpp"
+
+using namespace apache::activemq;
+
+/*
+ * 
+ */
+MessageConsumerSynchronization::MessageConsumerSynchronization(p<MessageConsumer> consumer, p<Message> message)
+{
+    this->consumer = consumer ;
+    this->message  = message ;
+}
+
+/*
+ * 
+ */
+MessageConsumerSynchronization::~MessageConsumerSynchronization()
+{
+}
+
+/*
+ * 
+ */
+void MessageConsumerSynchronization::beforeCommit()
+{
+    // no-op
+}
+
+/*
+ * 
+ */
+void MessageConsumerSynchronization::afterCommit()
+{
+    // no-op
+}
+
+/*
+ * 
+ */
+void MessageConsumerSynchronization::afterRollback()
+{
+    consumer->afterRollback( p_cast<ActiveMQMessage> (message)) ;
+}

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_MessageConsumerSynchronization_hpp_
+#define ActiveMQ_MessageConsumerSynchronization_hpp_
+
+#include "activemq/ISynchronization.hpp"
+#include "activemq/command/ActiveMQMessage.hpp"
+#include "activemq/command/Message.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace activemq
+  {
+    using namespace ifr;
+    class MessageConsumer;
+
+/*
+ * 
+ */
+class MessageConsumerSynchronization : public ISynchronization
+{
+private:
+    p<MessageConsumer> consumer ;
+    p<Message>         message ;
+
+public:
+    MessageConsumerSynchronization(p<MessageConsumer> consumer, p<Message> message) ;
+    ~MessageConsumerSynchronization() ;
+
+    virtual void beforeCommit() ;
+    virtual void afterCommit() ;
+    virtual void afterRollback() ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_MessageConsumerSynchronization_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/MessageProducer.hpp"
+#include "activemq/Session.hpp"
+
+using namespace apache::activemq;
+
+
+// Constructors -----------------------------------------------------
+
+/*
+ * 
+ */
+MessageProducer::MessageProducer(p<Session> session, p<ProducerInfo> producerInfo)
+{
+    this->session                 = session ;
+    this->producerInfo            = producerInfo ;
+    this->priority                = DEFAULT_PRIORITY ;
+    this->timeToLive              = DEFAULT_TIMETOLIVE ;
+    this->messageCounter          = 0 ;
+    this->persistent              = false ;
+    this->disableMessageID        = false ;
+    this->disableMessageTimestamp = false ;
+    this->closed                  = false ;
+}
+
+/*
+ * 
+ */
+MessageProducer::~MessageProducer()
+{
+    // Make sure the producer is closed
+    close() ;
+}
+
+
+// Attribute methods ------------------------------------------------
+
+bool MessageProducer::getPersistent()
+{
+    return persistent ;
+}
+
+void MessageProducer::setPersistent(bool persistent)
+{
+    this->persistent = persistent ;
+}
+
+long long MessageProducer::getTimeToLive()
+{
+    return timeToLive ;
+}
+
+void MessageProducer::getTimeToLive(long long ttl)
+{
+    this->timeToLive = ttl ;
+}
+
+int MessageProducer::getPriority()
+{
+    return priority ;
+}
+
+void MessageProducer::getPriority(int priority)
+{
+    this->priority = priority ;
+}
+
+bool MessageProducer::getDisableMessageID()
+{
+    return disableMessageID ;
+}
+
+void MessageProducer::getDisableMessageID(bool disable)
+{
+    this->disableMessageID = disable ;
+}
+
+bool MessageProducer::getDisableMessageTimestamp()
+{
+    return disableMessageTimestamp ;
+}
+
+void MessageProducer::getDisableMessageTimestamp(bool disable)
+{
+    this->disableMessageTimestamp = disable ;
+}
+
+
+// Operation methods ------------------------------------------------
+
+/*
+ * 
+ */
+void MessageProducer::send(p<IMessage> message)
+{
+    send(producerInfo->getDestination(), message, DEFAULT_PRIORITY, DEFAULT_TIMETOLIVE) ;
+}
+
+/*
+ * 
+ */
+void MessageProducer::send(p<IDestination> destination, p<IMessage> message)
+{
+    send(destination, message, DEFAULT_PRIORITY, DEFAULT_TIMETOLIVE) ;
+}
+
+/*
+ * 
+ */
+void MessageProducer::send(p<IDestination> destination, p<IMessage> message, char priority, long long timeToLive)
+{
+    p<MessageId> msgId = new MessageId() ;
+    msgId->setProducerId( producerInfo->getProducerId() ) ;
+
+    // Acquire next sequence id
+    {
+        LOCKED_SCOPE (mutex);
+        msgId->setProducerSequenceId( ++messageCounter ) ;
+    }
+
+    // Configure the message
+    p<ActiveMQMessage> activeMessage = p_dyncast<ActiveMQMessage> (message) ;
+    activeMessage->setMessageId( msgId ) ;
+    activeMessage->setProducerId( producerInfo->getProducerId() ) ;
+    activeMessage->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ;
+    activeMessage->setPriority(priority) ;
+
+    if( session->isTransacted() )
+    {
+        session->doStartTransaction() ;
+        activeMessage->setTransactionId( session->getTransactionContext()->getTransactionId() ) ;
+    }
+
+    // Set time values if not disabled
+    if( !this->disableMessageTimestamp )
+    {
+        long long timestamp = Time::getCurrentTimeMillis() ;
+
+        // Set message time stamp/expiration
+        activeMessage->setTimestamp(timestamp) ;
+        if( timeToLive > 0 )
+            activeMessage->setExpiration( timestamp + timeToLive ) ;
+    }
+
+    // Finally, transmit the message
+    session->doSend(destination, message) ;
+}
+
+/*
+ * 
+ */
+void MessageProducer::close()
+{
+    if( !closed )
+    {
+        closed = true ;
+    
+        // De-register producer from broker
+        session->getConnection()->disposeOf( producerInfo->getProducerId() ) ;
+
+        // Reset internal state (prevent cyclic references)
+        session = NULL ;
+    }
+}

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_MessageProducer_hpp_
+#define ActiveMQ_MessageProducer_hpp_
+
+#include <string>
+#include "cms/IDestination.hpp"
+#include "cms/IMessage.hpp"
+#include "cms/IMessageProducer.hpp"
+#include "activemq/command/ProducerInfo.hpp"
+#include "ppr/thread/SimpleMutex.hpp"
+#include "ppr/util/Time.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace activemq
+  {
+    using namespace ifr;
+    using namespace apache::cms;
+    using namespace apache::activemq::command;
+    using namespace apache::ppr;
+    using namespace apache::ppr::thread;
+    class Session ;
+
+/*
+ * 
+ */
+class MessageProducer : public IMessageProducer
+{
+private:
+    p<Session>      session ;
+    p<ProducerInfo> producerInfo ;
+    SimpleMutex     mutex ;
+    int             priority ;
+    long long       messageCounter,
+                    timeToLive ;
+    bool            closed,
+                    persistent,
+                    disableMessageID,
+                    disableMessageTimestamp ;
+
+    // Default message values
+    const static char      DEFAULT_PRIORITY   = 4 ;
+    const static long long DEFAULT_TIMETOLIVE = 0 ;
+
+public:
+    MessageProducer(p<Session> session, p<ProducerInfo> producerInfo) ;
+    virtual ~MessageProducer() ;
+
+    // Attribute methods
+	virtual bool getPersistent() ;
+	virtual void setPersistent(bool persistent) ;
+    virtual long long getTimeToLive() ;
+    virtual void getTimeToLive(long long ttl) ;
+    virtual int getPriority() ;
+    virtual void getPriority(int priority) ;
+    virtual bool getDisableMessageID() ;
+    virtual void getDisableMessageID(bool disable) ;
+    virtual bool getDisableMessageTimestamp() ;
+    virtual void getDisableMessageTimestamp(bool disable) ;
+
+    // Operation methods
+    virtual void send(p<IMessage> message) ;
+    virtual void send(p<IDestination> destination, p<IMessage> message) ;
+    virtual void send(p<IDestination> destination, p<IMessage> message, char priority, long long timeToLive) ;
+    virtual void close() ;
+
+private:
+    long long getCurrentTimeMillis() ;
+} ;
+
+/* namespace */
+  }
+}
+
+#endif /*ActiveMQ_MessageProducer_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,508 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/Session.hpp"
+#include "activemq/command/ActiveMQDestination.hpp"
+#include "activemq/command/ActiveMQQueue.hpp"
+#include "activemq/command/ActiveMQTopic.hpp"
+#include "activemq/command/ActiveMQTempQueue.hpp"
+#include "activemq/command/ActiveMQTempTopic.hpp"
+#include "activemq/command/ActiveMQMessage.hpp"
+#include "activemq/command/ActiveMQBytesMessage.hpp"
+#include "activemq/command/ActiveMQMapMessage.hpp"
+#include "activemq/command/ActiveMQTextMessage.hpp"
+#include "activemq/command/ProducerInfo.hpp"
+#include "activemq/command/ConsumerInfo.hpp"
+#include "activemq/command/MessageAck.hpp"
+#include "activemq/MessageConsumer.hpp"
+#include "activemq/MessageProducer.hpp"
+#include "activemq/Connection.hpp"
+
+using namespace apache::activemq;
+
+
+// Constructors -----------------------------------------------------
+
+/*
+ * 
+ */
+Session::Session(p<Connection> connection, p<SessionInfo> info, AcknowledgementMode ackMode)
+{
+    this->connection         = connection ;
+    this->sessionInfo        = info ;
+    this->ackMode            = ackMode ;
+    this->prefetchSize       = 1000 ;
+    this->consumerCounter    = 0 ;
+    this->producerCounter    = 0 ;
+    this->transactionContext = new TransactionContext(smartify(this)) ;
+    this->dispatchThread     = new DispatchThread(smartify(this)) ;
+    this->closed             = false ;
+
+    // Activate backround dispatch thread
+    dispatchThread->start() ;
+}
+
+/*
+ * 
+ */
+Session::~Session()
+{
+    // Make sure session is closed
+    close() ;
+}
+
+
+// Attribute methods ------------------------------------------------
+
+/*
+ * 
+ */
+bool Session::isTransacted()
+{
+    return ( ackMode == TransactionalAckMode ) ? true : false ; 
+}
+
+/*
+ * 
+ */
+p<Connection> Session::getConnection()
+{
+    return connection ;
+}
+
+/*
+ * 
+ */
+p<SessionId> Session::getSessionId()
+{
+    return sessionInfo->getSessionId() ;
+}
+
+/*
+ * 
+ */
+p<TransactionContext> Session::getTransactionContext()
+{
+    return transactionContext ;
+}
+
+/*
+ * 
+ */
+p<MessageConsumer> Session::getConsumer(p<ConsumerId> consumerId)
+{
+    map<long long, p<MessageConsumer> >::iterator tempIter ;
+
+    // Check if key exists in map
+    tempIter = consumers.find( consumerId->getValue() ) ;
+    if( tempIter == consumers.end() )
+        return NULL ;
+    else
+        return tempIter->second ;
+}
+
+/*
+ * 
+ */
+p<MessageProducer> Session::getProducer(p<ProducerId> producerId)
+{
+    map<long long, p<MessageProducer> >::iterator tempIter ;
+
+    // Check if key exists in map
+    tempIter = producers.find( producerId->getValue() ) ;
+    if( tempIter == producers.end() )
+        return NULL ;
+    else
+        return tempIter->second ;
+}
+
+
+// Operation methods ------------------------------------------------
+
+/*
+ * 
+ */
+p<IMessageProducer> Session::createProducer()
+{
+    return createProducer(NULL) ; 
+}
+
+/*
+ * 
+ */
+p<IMessageProducer> Session::createProducer(p<IDestination> destination)
+{
+    p<ProducerInfo> command  = createProducerInfo(destination) ;
+    p<ProducerId> producerId = command->getProducerId() ;
+
+    try
+    {
+        p<MessageProducer> producer = new MessageProducer(smartify(this), command) ;
+
+        // Save the producer
+        producers[ producerId->getValue() ] = producer ;
+
+        // Register producer with broker
+        connection->syncRequest(command) ;
+
+        return producer ;
+    }
+    catch( exception e )
+    {
+        // Make sure producer was removed
+        producers[ producerId->getValue() ] = NULL ;
+        throw e ;
+    }
+}
+
+/*
+ * 
+ */
+p<IMessageConsumer> Session::createConsumer(p<IDestination> destination)
+{
+    return createConsumer(destination, NULL) ; 
+}
+
+/*
+ * 
+ */
+p<IMessageConsumer> Session::createConsumer(p<IDestination> destination, const char* selector)
+{
+    p<ConsumerInfo> command  = createConsumerInfo(destination, selector) ;
+    p<ConsumerId> consumerId = command->getConsumerId() ;
+
+    try
+    {
+        p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
+
+        // Save the consumer first in case message dispatching starts immediately
+        consumers[ consumerId->getValue() ] = consumer ;
+
+        // Register consumer with broker
+        connection->syncRequest(command) ;
+
+        return consumer ;
+    }
+    catch( exception e )
+    {
+        // Make sure consumer was removed
+        consumers[ consumerId->getValue() ] = NULL ;
+        throw e ;
+    }
+}
+
+p<IMessageConsumer> Session::createDurableConsumer(p<ITopic> destination, const char* name, const char* selector, bool noLocal)
+{
+    p<ConsumerInfo> command  = createConsumerInfo(destination, selector) ;
+    p<ConsumerId> consumerId = command->getConsumerId() ;
+    p<string>     subscriptionName = new string(name) ;
+
+    command->setSubcriptionName( subscriptionName ) ;
+    command->setNoLocal( noLocal ) ;
+    
+    try
+    {
+        p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
+
+        // Save the consumer first in case message dispatching starts immediately
+        consumers[ consumerId->getValue() ] = consumer ;
+
+        // Register consumer with broker
+        connection->syncRequest(command) ;
+
+        return consumer ;
+    }
+    catch( exception e )
+    {
+        // Make sure consumer was removed
+        consumers[ consumerId->getValue() ] = NULL ;
+        throw e ;
+    }
+}
+
+/*
+ * 
+ */
+p<IQueue> Session::getQueue(const char* name)
+{
+    p<IQueue> queue = new ActiveMQQueue(name) ;
+    return queue ;
+}
+
+/*
+ * 
+ */
+p<ITopic> Session::getTopic(const char* name)
+{
+    p<ITopic> topic = new ActiveMQTopic(name) ;
+    return topic ;
+}
+
+/*
+ * 
+ */
+p<ITemporaryQueue> Session::createTemporaryQueue()
+{
+    p<ITemporaryQueue> queue = new ActiveMQTempQueue( connection->createTemporaryDestinationName()->c_str() ) ;
+    return queue ;
+}
+
+/*
+ * 
+ */
+p<ITemporaryTopic> Session::createTemporaryTopic()
+{
+    p<ITemporaryTopic> topic = new ActiveMQTempTopic( connection->createTemporaryDestinationName()->c_str() ) ;
+    return topic ;
+}
+
+/*
+ * 
+ */
+p<IMessage> Session::createMessage()
+{
+    p<IMessage> message = new ActiveMQMessage() ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<IBytesMessage> Session::createBytesMessage()
+{
+    p<IBytesMessage> message = new ActiveMQBytesMessage() ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<IBytesMessage> Session::createBytesMessage(char* body, int size)
+{
+    p<IBytesMessage> message = new ActiveMQBytesMessage( body, size ) ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<IMapMessage> Session::createMapMessage()
+{
+    p<IMapMessage> message = new ActiveMQMapMessage() ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<ITextMessage> Session::createTextMessage()
+{
+    p<ITextMessage> message = new ActiveMQTextMessage() ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+p<ITextMessage> Session::createTextMessage(const char* text)
+{
+    p<ITextMessage> message = new ActiveMQTextMessage(text) ;
+    configure(message) ;
+    return message ;
+}
+
+/*
+ * 
+ */
+void Session::commit() throw(CmsException)
+{
+    if( !isTransacted() )
+        throw CmsException("You cannot perform a commit on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
+
+    transactionContext->commit() ;
+}
+
+/*
+ * 
+ */
+void Session::rollback() throw(CmsException)
+{
+    if( !isTransacted() )
+        throw CmsException("You cannot perform a rollback on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
+
+    transactionContext->rollback() ;
+
+    map<long long, p<MessageConsumer> >::const_iterator tempIter ;
+
+    // Ensure all the consumers redeliver any rolled back messages
+    for( tempIter = consumers.begin() ;
+         tempIter != consumers.end() ;
+         tempIter++ )
+    {
+        ((*tempIter).second)->redeliverRolledBackMessages() ;
+    }
+}
+
+/*
+ * 
+ */
+void Session::doSend(p<IDestination> destination, p<IMessage> message)
+{
+    p<ActiveMQMessage> command = p_dyncast<ActiveMQMessage> (message) ;
+    // TODO complete packet
+    connection->syncRequest(command) ;
+}
+
+/*
+ * Starts a new transaction
+ */
+void Session::doStartTransaction()
+{
+    if( isTransacted() )
+        transactionContext->begin() ;
+}
+
+/*
+ * 
+ */
+void Session::dispatch(int delay)
+{
+    if( delay > 0 ) 
+        dispatchThread->sleep(delay) ;
+
+    dispatchThread->wakeup() ;
+}
+
+/*
+ * 
+ */
+void Session::dispatchAsyncMessages()
+{
+    // Ensure that only 1 thread dispatches messages in a consumer at once
+    LOCKED_SCOPE (mutex);
+
+    map<long long, p<MessageConsumer> >::const_iterator tempIter ;
+
+    // Iterate through each consumer created by this session
+    // ensuring that they have all pending messages dispatched
+    for( tempIter = consumers.begin() ;
+         tempIter != consumers.end() ;
+         tempIter++ )
+    {
+        ((*tempIter).second)->dispatchAsyncMessages() ;
+    }
+}
+
+/*
+ *
+ */
+void Session::close()
+{
+    if( !closed )
+    {
+        map<long long, p<MessageConsumer> >::iterator consumerIter ;
+        map<long long, p<MessageProducer> >::iterator producerIter ;
+
+        // Iterate through all consumers and close them down
+        for( consumerIter = consumers.begin() ;
+             consumerIter != consumers.end() ;
+             consumerIter++ )
+        {
+            consumerIter->second->close() ;
+            consumerIter->second = NULL ;
+        }
+
+        // Iterate through all producers and close them down
+        for( producerIter = producers.begin() ;
+             producerIter != producers.end() ;
+             producerIter++ )
+        {
+            producerIter->second->close() ;
+            producerIter->second = NULL ;
+        }
+        // De-register session from broker
+        connection->disposeOf( sessionInfo->getSessionId() ) ;
+
+        // Clean up
+        connection = NULL ;
+        closed     = true ;
+    }
+}
+
+
+// Implementation methods ------------------------------------------
+
+/*
+ * 
+ */
+p<ConsumerInfo> Session::createConsumerInfo(p<IDestination> destination, const char* selector)
+{
+    p<ConsumerInfo> consumerInfo = new ConsumerInfo() ;
+    p<ConsumerId> consumerId = new ConsumerId() ;
+
+    consumerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
+    consumerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
+
+    {
+        LOCKED_SCOPE (mutex);
+        consumerId->setValue( ++consumerCounter ) ;
+    }
+    p<string> sel = ( selector == NULL ) ? NULL : new string(selector) ;
+
+    // TODO complete packet
+    consumerInfo->setConsumerId( consumerId ) ;
+    consumerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
+    consumerInfo->setSelector( sel ) ;
+    consumerInfo->setPrefetchSize( this->prefetchSize ) ;
+
+    return consumerInfo ;
+}
+
+/*
+ * 
+ */
+p<ProducerInfo> Session::createProducerInfo(p<IDestination> destination)
+{
+    p<ProducerInfo> producerInfo = new ProducerInfo() ;
+    p<ProducerId> producerId = new ProducerId() ;
+
+    producerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
+    producerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
+
+    {
+        LOCKED_SCOPE (mutex);
+        producerId->setValue( ++producerCounter ) ;
+    }
+
+    // TODO complete packet
+    producerInfo->setProducerId( producerId ) ;
+    producerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
+
+    return producerInfo ;
+} 
+
+/*
+ * Configures the message command.
+ */
+void Session::configure(p<IMessage> message)
+{
+    // TODO:
+}