You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2006/04/27 23:59:44 UTC
svn commit: r397654 [2/12] - in /incubator/activemq/trunk/openwire-cpp: ./
src/ src/command/ src/gram/ src/gram/java/ src/gram/java/org/
src/gram/java/org/apache/ src/gram/java/org/apache/activemq/
src/gram/java/org/apache/activemq/openwire/ src/gram/j...
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,382 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/Connection.hpp"
+#include "activemq/Session.hpp"
+
+using namespace apache::activemq;
+
+
+// --- Constructors -------------------------------------------------
+
+/*
+ *
+ */
+Connection::Connection(p<ITransport> transport, p<ConnectionInfo> connectionInfo)
+{
+ this->transport = transport ;
+ this->connectionInfo = connectionInfo ;
+ this->acknowledgementMode = AutoAckMode ;
+ this->connected = false ;
+ this->closed = false ;
+ this->brokerInfo = NULL ;
+ this->brokerWireFormat = NULL ;
+ this->sessionCounter = 0 ;
+ this->tempDestinationCounter = 0 ;
+ this->localTransactionCounter = 0 ;
+
+ // Hook up as a command listener and start dispatching
+ transport->setCommandListener(smartify(this)) ;
+ transport->start() ;
+}
+
+/*
+ *
+ */
+Connection::~Connection()
+{
+ // Make sure connection is closed
+ close() ;
+}
+
+
+// --- Attribute methods --------------------------------------------
+
+/*
+ *
+ */
+void Connection::setExceptionListener(p<IExceptionListener> listener)
+{
+ this->listener = listener ;
+}
+
+/*
+ *
+ */
+p<IExceptionListener> Connection::getExceptionListener()
+{
+ return listener ;
+}
+
+/*
+ *
+ */
+p<ITransport> Connection::getTransport()
+{
+ return transport ;
+}
+
+/*
+ *
+ */
+void Connection::setTransport(p<ITransport> transport)
+{
+ this->transport = transport ;
+}
+
+/*
+ *
+ */
+p<string> Connection::getClientId()
+{
+ return connectionInfo->getClientId() ;
+}
+
+/*
+ *
+ */
+void Connection::setClientId(const char* value) throw (CmsException)
+{
+ if( connected )
+ throw CmsException("You cannot change the client id once a connection is established") ;
+
+ p<string> clientId = new string(value) ;
+ connectionInfo->setClientId( clientId ) ;
+}
+
+
+/*
+ *
+ */
+p<BrokerInfo> Connection::getBrokerInfo()
+{
+ return brokerInfo ;
+}
+
+/*
+ *
+ */
+p<WireFormatInfo> Connection::getBrokerWireFormat()
+{
+ return brokerWireFormat ;
+}
+
+/*
+ *
+ */
+AcknowledgementMode Connection::getAcknowledgementMode()
+{
+ return acknowledgementMode ;
+}
+
+/*
+ *
+ */
+void Connection::setAcknowledgementMode(AcknowledgementMode ackMode)
+{
+ acknowledgementMode = ackMode ;
+}
+
+/*
+ *
+ */
+p<ConnectionId> Connection::getConnectionId()
+{
+ return connectionInfo->getConnectionId() ;
+}
+
+
+// --- Operation methods --------------------------------------------
+
+/*
+ *
+ */
+void Connection::close()
+{
+ if( !closed )
+ {
+ list< p<ISession> >::iterator sessionIter ;
+
+ // Iterate through all sessions and close them down
+ for( sessionIter = sessions.begin() ;
+ sessionIter != sessions.end() ;
+ sessionIter++ )
+ {
+ (*sessionIter)->close() ;
+ }
+ // Empty session list
+ sessions.clear() ;
+
+ // Remove connection from broker
+ disposeOf( getConnectionId() ) ;
+
+ // Finally, transmit a shutdown command to broker
+ transport->oneway( new ShutdownInfo() ) ;
+ closed = true ;
+ }
+}
+
+/*
+ *
+ */
+p<ISession> Connection::createSession() throw(CmsException)
+{
+ return createSession(acknowledgementMode) ;
+}
+
+/*
+ *
+ */
+p<ISession> Connection::createSession(AcknowledgementMode ackMode) throw(CmsException)
+{
+ p<SessionInfo> sessionInfo = createSessionInfo( ackMode ) ;
+
+ // Send session info to broker
+ syncRequest(sessionInfo) ;
+
+ p<ISession> session = new Session(smartify(this), sessionInfo, ackMode) ;
+ sessions.push_back(session) ;
+
+ return session ;
+}
+
+/*
+ * Performs a synchronous request-response with the broker.
+ */
+p<Response> Connection::syncRequest(p<ICommand> command) throw(CmsException)
+{
+ checkConnected() ;
+
+ p<Response> response = transport->request(command) ;
+
+ if( response->getDataStructureType() == ExceptionResponse::TYPE )
+ {
+ p<ExceptionResponse> exceptionResponse = p_cast<ExceptionResponse> (response) ;
+ p<BrokerError> brokerError = exceptionResponse->getException() ;
+ string message ;
+
+ // Build error message
+ message.assign("Request failed: ") ;
+ message.append( brokerError->getExceptionClass()->c_str() ) ;
+ message.append(", ") ;
+ message.append( brokerError->getStackTrace()->c_str() ) ;
+
+ // TODO: Change to CMSException()
+ throw CmsException( message.c_str() ) ;
+ }
+ return response ;
+}
+
+/*
+ *
+ */
+void Connection::oneway(p<ICommand> command) throw(CmsException)
+{
+ checkConnected() ;
+ transport->oneway(command) ;
+}
+
+/*
+ *
+ */
+void Connection::disposeOf(p<IDataStructure> dataStructure) throw(CmsException)
+{
+ p<RemoveInfo> command = new RemoveInfo() ;
+ command->setObjectId( dataStructure ) ;
+ syncRequest(command) ;
+}
+
+/*
+ * Creates a new temporary destination name.
+ */
+p<string> Connection::createTemporaryDestinationName()
+{
+ p<string> name = new string() ;
+ char* buffer = new char[15] ;
+
+ {
+ LOCKED_SCOPE( mutex );
+
+ name->assign( connectionInfo->getConnectionId()->getValue()->c_str() ) ;
+ name->append( ":" ) ;
+#ifdef unix
+ sprintf(buffer, "%lld", ++tempDestinationCounter) ;
+#else
+ sprintf(buffer, "%I64d", ++tempDestinationCounter) ;
+#endif
+ name->append( buffer ) ;
+ }
+
+ return name ;
+}
+
+/*
+ * Creates a new local transaction ID.
+ */
+p<LocalTransactionId> Connection::createLocalTransactionId()
+{
+ p<LocalTransactionId> id = new LocalTransactionId() ;
+
+ id->setConnectionId( getConnectionId() ) ;
+
+ {
+ LOCKED_SCOPE (mutex);
+ id->setValue( ++localTransactionCounter ) ;
+ }
+
+ return id ;
+}
+
+
+// --- Implementation methods ---------------------------------------
+
+/*
+ *
+ */
+p<SessionInfo> Connection::createSessionInfo(AcknowledgementMode ackMode)
+{
+ p<SessionInfo> sessionInfo = new SessionInfo() ;
+ p<SessionId> sessionId = new SessionId() ;
+
+ sessionId->setConnectionId ( connectionInfo->getConnectionId()->getValue() ) ;
+
+ {
+ LOCKED_SCOPE( mutex );
+ sessionId->setValue( ++sessionCounter ) ;
+ }
+
+ sessionInfo->setSessionId( sessionId ) ;
+ return sessionInfo ;
+}
+
+/*
+ *
+ */
+void Connection::checkConnected() throw(CmsException)
+{
+ if( closed )
+ throw ConnectionClosedException("Oops! Connection already closed.") ;
+
+ if( !connected )
+ {
+ connected = true ;
+
+ // Send the connection info and see if we get an ack/nak
+ syncRequest( connectionInfo ) ;
+ }
+}
+
+/*
+ * Handle incoming commands.
+ */
+void Connection::onCommand(p<ITransport> transport, p<ICommand> command)
+{
+ if( command->getDataStructureType() == MessageDispatch::TYPE )
+ {
+ p<MessageDispatch> dispatch = p_cast<MessageDispatch> (command) ;
+ p<ConsumerId> consumerId = dispatch->getConsumerId() ;
+ p<MessageConsumer> consumer = NULL ;
+ list< p<ISession> >::const_iterator tempIter ;
+
+ // Iterate through all sessions and check if one has the consumer
+ for( tempIter = sessions.begin() ;
+ tempIter != sessions.end() ;
+ tempIter++ )
+ {
+ consumer = p_cast<Session> (*tempIter)->getConsumer(consumerId) ;
+
+ // Found a match?
+ if( consumer != NULL )
+ break ;
+ }
+ if( consumer == NULL )
+ cout << "ERROR: No such consumer active: " << consumerId->getValue() << endl ;
+ else
+ {
+ p<ActiveMQMessage> message = p_cast<ActiveMQMessage> (dispatch->getMessage()) ;
+ consumer->dispatch(message) ;
+ }
+ }
+ else if( command->getDataStructureType() == WireFormatInfo::TYPE )
+ this->brokerWireFormat = p_cast<WireFormatInfo> (command) ;
+
+ else if( command->getDataStructureType() == BrokerInfo::TYPE )
+ this->brokerInfo = p_cast<BrokerInfo> (command) ;
+
+ else
+ cout << "ERROR: Unknown command: " << command->getDataStructureType() << endl ;
+}
+
+/*
+ * Handle incoming broker errors.
+ */
+void Connection::onError(p<ITransport> transport, exception& error)
+{
+ if( listener != NULL )
+ this->listener->onException(error) ;
+ else
+ cout << "ERROR: Received a broker exception: " << error.what() << endl ;
+}
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Connection.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_Connection_hpp_
+#define ActiveMQ_Connection_hpp_
+
+#include <iostream>
+#include <list>
+#include <string>
+#include <map>
+
+#include "cms/ISession.hpp"
+#include "cms/IConnection.hpp"
+#include "cms/IExceptionListener.hpp"
+#include "cms/CmsException.hpp"
+#include "activemq/ConnectionClosedException.hpp"
+#include "activemq/MessageConsumer.hpp"
+#include "activemq/command/BrokerInfo.hpp"
+#include "activemq/command/WireFormatInfo.hpp"
+#include "activemq/command/ExceptionResponse.hpp"
+#include "activemq/command/ConnectionInfo.hpp"
+#include "activemq/command/LocalTransactionId.hpp"
+#include "activemq/command/MessageDispatch.hpp"
+#include "activemq/command/SessionInfo.hpp"
+#include "activemq/command/SessionId.hpp"
+#include "activemq/command/ShutdownInfo.hpp"
+#include "activemq/transport/ITransport.hpp"
+#include "activemq/transport/ICommandListener.hpp"
+#include "ppr/thread/SimpleMutex.hpp"
+#include "ppr/util/ifr/p"
+
+// Turn off warning message for ignored exception specification
+#ifdef _MSC_VER
+#pragma warning( disable : 4290 )
+#endif
+
+namespace apache
+{
+ namespace activemq
+ {
+ using namespace std;
+ using namespace ifr;
+ using namespace apache::cms;
+ using namespace apache::activemq::command;
+ using namespace apache::activemq::transport;
+ using namespace apache::ppr::thread;
+ using namespace apache::ppr::util;
+
+/*
+ *
+ */
+class Connection : public IConnection, public ICommandListener
+{
+private:
+ p<ConnectionInfo> connectionInfo ;
+ p<ITransport> transport ;
+ p<BrokerInfo> brokerInfo ; // from MQ broker
+ p<WireFormatInfo> brokerWireFormat ; // from MQ broker
+ p<IExceptionListener> listener ;
+ list< p<ISession> > sessions ;
+ bool connected,
+ closed ;
+ AcknowledgementMode acknowledgementMode ;
+ long long sessionCounter,
+ tempDestinationCounter,
+ localTransactionCounter ;
+ SimpleMutex mutex ;
+
+public:
+ // Constructors
+ Connection(p<ITransport> transport, p<ConnectionInfo> connectionInfo) ;
+ virtual ~Connection() ;
+
+ // Attribute methods
+ virtual void setExceptionListener(p<IExceptionListener> listener) ;
+ virtual p<IExceptionListener> getExceptionListener() ;
+ virtual p<ITransport> getTransport() ;
+ virtual void setTransport(p<ITransport> transport) ;
+ virtual p<string> getClientId() ;
+ virtual void setClientId(const char* value) throw (CmsException) ;
+ virtual p<BrokerInfo> getBrokerInfo() ;
+ virtual p<WireFormatInfo> getBrokerWireFormat() ;
+ virtual AcknowledgementMode getAcknowledgementMode() ;
+ virtual void setAcknowledgementMode(AcknowledgementMode mode) ;
+// virtual void addConsumer(p<ConsumerId> consumerId, p<MessageConsumer> messageConsumer) ;
+// virtual void removeConsumer(p<ConsumerId> consumerId) ;
+ virtual p<ConnectionId> getConnectionId() ;
+
+ // Operation methods
+ virtual p<ISession> createSession() throw(CmsException) ;
+ virtual p<ISession> createSession(AcknowledgementMode mode) throw(CmsException) ;
+ virtual p<Response> syncRequest(p<ICommand> command) throw(CmsException) ;
+ virtual void oneway(p<ICommand> command) throw(CmsException) ;
+ virtual void disposeOf(p<IDataStructure> dataStructure) throw(CmsException) ;
+ virtual p<string> createTemporaryDestinationName() ;
+ virtual p<LocalTransactionId> createLocalTransactionId() ;
+ virtual void close() ;
+
+protected:
+ // Implementation methods
+ p<SessionInfo> createSessionInfo(AcknowledgementMode mode) ;
+ void checkConnected() throw(CmsException) ;
+ void onCommand(p<ITransport> transport, p<ICommand> command) ;
+ void onError(p<ITransport> transport, exception& error) ;
+} ;
+
+/* namespace */
+ }
+}
+
+#endif /*ActiveMQ_Connection_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/ConnectionClosedException.hpp"
+
+using namespace apache::activemq;
+
+/*
+ *
+ */
+ConnectionClosedException::ConnectionClosedException(const char* message)
+ : CmsException(message)
+{
+ // no-op
+}
+
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionClosedException.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_ConnectionClosedException_hpp_
+#define ActiveMQ_ConnectionClosedException_hpp_
+
+#include "cms/CmsException.hpp"
+
+namespace apache
+{
+ namespace activemq
+ {
+ using namespace apache::cms;
+
+/*
+ * Signals that a connection is being used when it is already closed.
+ */
+class ConnectionClosedException : public CmsException
+{
+public:
+ ConnectionClosedException(const char* message) ;
+};
+
+/* namespace */
+ }
+}
+
+#endif /*ActiveMQ_ConnectionClosedException_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionException.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionException.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionException.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionException.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_ConnectionException_hpp_
+#define ActiveMQ_ConnectionException_hpp_
+
+#include "ppr/TraceException.hpp"
+
+namespace apache
+{
+ namespace activemq
+ {
+ using namespace std ;
+ using namespace apache::ppr;
+
+/*
+ * Signals that a connection error has occured.
+ */
+class ConnectionException : public TraceException
+{
+public:
+ ConnectionException() : TraceException()
+ { /* no-op */ } ;
+ ConnectionException(const char *const& msg) : TraceException(msg)
+ { /* no-op */ } ;
+ ConnectionException(const char* fileName, int lineNo, const char* msg) : TraceException(msg)
+ { /* no-op */ } ;
+} ;
+
+/* namespace */
+ }
+}
+
+#endif /*ActiveMQ_ConnectionException_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/ConnectionFactory.hpp"
+#include "activemq/Connection.hpp"
+#include "activemq/protocol/openwire/OpenWireProtocol.hpp"
+#include "activemq/transport/tcp/TcpTransport.hpp"
+
+using namespace apache::activemq;
+
+/*
+ *
+ */
+ConnectionFactory::ConnectionFactory()
+{
+ // Use default URI
+ brokerUri = new Uri ("tcp://localhost:61616?wireFormat=openwire") ;
+ username = NULL ;
+ password = NULL ;
+ clientId = Guid::getGuidString() ;
+ transportFactory = new TransportFactory() ;
+}
+
+/*
+ *
+ */
+ConnectionFactory::ConnectionFactory(p<Uri> brokerUri)
+{
+ this->brokerUri = brokerUri;
+ username = NULL ;
+ password = NULL ;
+ clientId = Guid::getGuidString() ;
+ transportFactory = new TransportFactory() ;
+}
+
+
+// --- Attribute methods --------------------------------------------
+
+/*
+ *
+ */
+p<Uri> ConnectionFactory::getBrokerUri()
+{
+ return brokerUri ;
+}
+
+/*
+ *
+ */
+void ConnectionFactory::setBrokerUri(p<Uri> brokerUri)
+{
+ this->brokerUri = brokerUri ;
+}
+
+/*
+ *
+ */
+p<string> ConnectionFactory::getUsername()
+{
+ return username ;
+}
+
+/*
+ *
+ */
+void ConnectionFactory::setUsername(const char* username)
+{
+ this->username = new string(username) ;
+}
+
+/*
+ *
+ */
+p<string> ConnectionFactory::getPassword()
+{
+ return password ;
+}
+
+/*
+ *
+ */
+void ConnectionFactory::setPassword(const char* password)
+{
+ this->password = new string(password);
+
+}
+
+/*
+ *
+ */
+p<string> ConnectionFactory::getClientId()
+{
+ return clientId ;
+}
+
+/*
+ *
+ */
+void ConnectionFactory::setClientId(const char* clientId)
+{
+ this->clientId = new string(clientId) ;
+}
+
+
+// --- Operation methods --------------------------------------------
+
+/*
+ *
+ */
+p<IConnection> ConnectionFactory::createConnection() throw (ConnectionException)
+{
+ return createConnection( (username != NULL) ? username->c_str() : NULL,
+ (password != NULL) ? password->c_str() : NULL ) ;
+}
+
+/*
+ *
+ */
+p<IConnection> ConnectionFactory::createConnection(const char* username, const char* password) throw (ConnectionException)
+{
+ p<ConnectionInfo> connectionInfo ;
+ p<ITransport> transport ;
+ p<Connection> connection ;
+
+
+ // Set up a new connection object
+ connectionInfo = createConnectionInfo(username, password) ;
+ transport = createTransport() ;
+ connection = new Connection(transport, connectionInfo) ;
+ connection->setClientId( clientId->c_str() ) ;
+
+ return connection ;
+}
+
+
+// --- Implementation methods ---------------------------------------
+
+/*
+ *
+ */
+p<ConnectionInfo> ConnectionFactory::createConnectionInfo(const char* username, const char* password)
+{
+ p<ConnectionInfo> connectionInfo = new ConnectionInfo() ;
+ p<ConnectionId> connectionId = new ConnectionId() ;
+ p<string> uid = (username != NULL) ? new string(username) : NULL ;
+ p<string> pwd = (password != NULL) ? new string(password) : NULL ;
+
+ connectionId->setValue( Guid::getGuidString() ) ;
+ connectionInfo->setConnectionId( connectionId ) ;
+ connectionInfo->setUserName( uid ) ;
+ connectionInfo->setPassword( pwd ) ;
+ connectionInfo->setClientId( clientId ) ;
+
+ return connectionInfo ;
+}
+
+/*
+ *
+ */
+p<ITransport> ConnectionFactory::createTransport() throw (ConnectionException)
+{
+ try
+ {
+ // Create a transport for given URI
+ return transportFactory->createTransport( brokerUri ) ;
+ }
+ catch( SocketException se )
+ {
+ throw ConnectionException(__FILE__, __LINE__, "Failed to connect socket") ;
+ }
+}
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConnectionFactory.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_ConnectionFactory_hpp_
+#define ActiveMQ_ConnectionFactory_hpp_
+
+// Must be included before any STL includes
+#include "ppr/util/Guid.hpp"
+
+#include <string>
+#include "cms/IConnection.hpp"
+#include "cms/IConnectionFactory.hpp"
+#include "activemq/ConnectionException.hpp"
+#include "activemq/command/ConnectionInfo.hpp"
+#include "activemq/command/ConnectionId.hpp"
+#include "activemq/protocol/IProtocol.hpp"
+#include "activemq/transport/ITransport.hpp"
+#include "activemq/transport/ITransportFactory.hpp"
+#include "activemq/transport/TransportFactory.hpp"
+#include "ppr/net/Uri.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+ namespace activemq
+ {
+ using namespace apache::activemq::command;
+ using namespace apache::activemq::protocol;
+ using namespace apache::activemq::transport;
+ using namespace apache::ppr::net;
+ using namespace ifr;
+
+/*
+ *
+ */
+class ConnectionFactory : public IConnectionFactory
+{
+private:
+ p<Uri> brokerUri ;
+ p<string> username ;
+ p<string> password ;
+ p<string> clientId ;
+ p<IProtocol> protocol ;
+ p<ITransportFactory> transportFactory ;
+
+public:
+ // Constructors
+ ConnectionFactory() ;
+ ConnectionFactory(p<Uri> uri) ;
+
+ // Attribute methods
+ virtual p<Uri> getBrokerUri() ;
+ virtual void setBrokerUri(p<Uri> brokerUri) ;
+ virtual p<string> getUsername() ;
+ virtual void setUsername(const char* username) ;
+ virtual p<string> getPassword() ;
+ virtual void setPassword(const char* password) ;
+ virtual p<string> getClientId() ;
+ virtual void setClientId(const char* clientId) ;
+
+ // Operation methods
+ virtual p<IConnection> createConnection() throw (ConnectionException) ;
+ virtual p<IConnection> createConnection(const char* username, const char* password) throw (ConnectionException) ;
+
+protected:
+ // Implementation methods
+ virtual p<ConnectionInfo> createConnectionInfo(const char* username, const char* password) ;
+ virtual p<ITransport> createTransport() throw (ConnectionException) ;
+} ;
+
+/* namespace */
+ }
+}
+
+#endif /*ActiveMQ_ConnectionFactory_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/ConsumerClosedException.hpp"
+
+using namespace apache::activemq;
+
+/*
+ *
+ */
+ConsumerClosedException::ConsumerClosedException(const char* message)
+ : CmsException(message)
+{
+ // no-op
+}
+
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ConsumerClosedException.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_ConsumerClosedException_hpp_
+#define ActiveMQ_ConsumerClosedException_hpp_
+
+#include "cms/CmsException.hpp"
+
+namespace apache
+{
+ namespace activemq
+ {
+ using namespace apache::cms;
+
+/*
+ * Signals that a consumer is being used when it is already closed.
+ */
+class ConsumerClosedException : public CmsException
+{
+public:
+ ConsumerClosedException(const char* message) ;
+};
+
+/* namespace */
+ }
+}
+
+#endif /*ActiveMQ_ConsumerClosedException_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/DestinationFilter.hpp"
+
+using namespace apache::activemq;
+
+// Init static constants
+const char* DestinationFilter::ANY_DESCENDENT = ">" ;
+const char* DestinationFilter::ANY_CHILD = "*" ;
+
+/*
+ *
+ */
+DestinationFilter::DestinationFilter()
+{
+ // no-op
+}
+
+/*
+ *
+ */
+DestinationFilter::~DestinationFilter()
+{
+ // no-op
+}
+
+/*
+ *
+ */
+bool DestinationFilter::matches(p<ActiveMQMessage> message)
+{
+ return matches( message->getDestination() ) ;
+}
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/DestinationFilter.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_DestinationFilter_hpp_
+#define ActiveMQ_DestinationFilter_hpp_
+
+#include "activemq/command/ActiveMQMessage.hpp"
+#include "activemq/command/ActiveMQDestination.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+ namespace activemq
+ {
+ using namespace ifr;
+ using namespace apache::activemq::command;
+
+/*
+ *
+ */
+class DestinationFilter
+{
+public:
+ const static char* ANY_DESCENDENT ;
+ const static char* ANY_CHILD ;
+
+public:
+ DestinationFilter() ;
+ virtual ~DestinationFilter() ;
+
+ virtual bool matches(p<ActiveMQMessage> message) ;
+ virtual bool matches(p<ActiveMQDestination> destination) = 0 ;
+};
+
+/* namespace */
+ }
+}
+
+#endif /*ActiveMQ_DestinationFilter_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/Dispatcher.hpp"
+
+using namespace apache::activemq;
+
+/*
+ *
+ */
+Dispatcher::Dispatcher()
+{
+ dispatchQueue = new queue< p<IMessage> > ;
+ redeliverList = new list< p<IMessage> > ;
+}
+
+/*
+ *
+ */
+void Dispatcher::redeliverRolledBackMessages()
+{
+ LOCKED_SCOPE (mutex);
+
+ p<queue< p<IMessage> > > replacementQueue = new queue< p<IMessage> > ;
+ //(dispatchQueue->size() + redeliverList->size() ) ;
+
+ // Copy all messages to be redelivered to the new queue
+ while( !redeliverList->empty() )
+ {
+ replacementQueue->push( redeliverList->front() ) ;
+ redeliverList->pop_front() ;
+ }
+
+ // Copy all messages to be dispatched to the new queue
+ while( dispatchQueue->size() > 0 )
+ {
+ // Get first element in queue
+ p<IMessage> element = p_cast<IMessage> (dispatchQueue->front()) ;
+
+ // Remove first element from queue
+ dispatchQueue->pop() ;
+
+ // Add element to the new queue
+ replacementQueue->push(element) ;
+ }
+ // Switch to the new queue
+ dispatchQueue = replacementQueue ;
+
+ semaphore.notify() ;
+}
+
+/*
+ *
+ */
+void Dispatcher::redeliver(p<IMessage> message)
+{
+ LOCKED_SCOPE (mutex);
+ redeliverList->push_back(message) ;
+}
+
+/*
+ *
+ */
+void Dispatcher::enqueue(p<IMessage> message)
+{
+ LOCKED_SCOPE (mutex);
+ dispatchQueue->push(message) ;
+ semaphore.notify() ;
+}
+
+/*
+ *
+ */
+p<IMessage> Dispatcher::dequeueNoWait()
+{
+ p<IMessage> msg = NULL ;
+
+ {
+ LOCKED_SCOPE (mutex);
+
+ if( dispatchQueue->size() > 0 )
+ {
+ msg = p_cast<IMessage> (dispatchQueue->front()) ;
+ dispatchQueue->pop() ;
+ }
+ }
+ return msg ;
+}
+
+/*
+ *
+ */
+p<IMessage> Dispatcher::dequeue(int timeout)
+{
+ p<IMessage> msg = NULL ;
+
+ {
+ LOCKED_SCOPE (mutex);
+
+ if( dispatchQueue->size() == 0 )
+ semaphore.wait(timeout) ;
+
+ if( dispatchQueue->size() > 0 )
+ {
+ msg = p_cast<IMessage> (dispatchQueue->front()) ;
+ dispatchQueue->pop() ;
+ }
+ }
+ return msg ;
+}
+
+/*
+ *
+ */
+p<IMessage> Dispatcher::dequeue()
+{
+ p<IMessage> msg = NULL ;
+
+ {
+ LOCKED_SCOPE (mutex);
+
+ msg = p_cast<IMessage> (dispatchQueue->front()) ;
+ dispatchQueue->pop() ;
+ }
+
+ return msg ;
+}
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Dispatcher.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_Dispatcher_hpp_
+#define ActiveMQ_Dispatcher_hpp_
+
+#include <string>
+#include <list>
+#include <queue>
+#include "cms/IMessage.hpp"
+#include "activemq/command/Response.hpp"
+#include "ppr/thread/SimpleMutex.hpp"
+#include "ppr/thread/Semaphore.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+ namespace activemq
+ {
+ using namespace ifr;
+ using namespace apache::activemq::command;
+ using namespace apache::cms;
+ using namespace apache::ppr::thread;
+
+/*
+ * Handles the multi-threaded dispatching between transport and consumers.
+ */
+class Dispatcher
+{
+private:
+ p<queue< p<IMessage> > > dispatchQueue ;
+ p<list< p<IMessage> > > redeliverList ;
+ SimpleMutex mutex ;
+ Semaphore semaphore ;
+
+public:
+ Dispatcher() ;
+ virtual ~Dispatcher() {}
+
+ virtual void redeliverRolledBackMessages() ;
+ virtual void redeliver(p<IMessage> message) ;
+ virtual void enqueue(p<IMessage> message) ;
+ virtual p<IMessage> dequeueNoWait() ;
+ virtual p<IMessage> dequeue(int timeout) ;
+ virtual p<IMessage> dequeue() ;
+} ;
+
+/* namespace */
+ }
+}
+
+#endif /*ActiveMQ_Dispatcher_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IAcknowledger.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IAcknowledger.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IAcknowledger.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IAcknowledger.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_IAcknowledger_hpp_
+#define ActiveMQ_IAcknowledger_hpp_
+
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+ namespace activemq
+ {
+ namespace command
+ {
+ class ActiveMQMessage ;
+ }
+ using namespace ifr;
+ using namespace apache::activemq::command;
+
+/*
+ * Interface for message acknowledgers.
+ */
+struct IAcknowledger : Interface
+{
+ virtual void acknowledge(p<ActiveMQMessage> message) = 0 ;
+} ;
+
+/* namespace */
+ }
+}
+
+#endif /*ActiveMQ_IAcknowledger_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ICommand.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ICommand.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ICommand.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ICommand.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_ICommand_hpp_
+#define ActiveMQ_ICommand_hpp_
+
+#include "activemq/IDataStructure.hpp"
+
+namespace apache
+{
+ namespace activemq
+ {
+
+/*
+ * An OpenWire command
+ */
+struct ICommand : IDataStructure
+{
+ virtual int getCommandId() = 0;
+ virtual void setCommandId(int value) = 0;
+ virtual bool getResponseRequired() = 0;
+ virtual void setResponseRequired(bool value) = 0;
+};
+
+/* namespace */
+ }
+}
+
+#endif /*ActiveMQ_ICommand_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IDataStructure.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IDataStructure.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IDataStructure.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/IDataStructure.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_IDataStructure_hpp_
+#define ActiveMQ_IDataStructure_hpp_
+
+#include "ppr/io/IOutputStream.hpp"
+#include "ppr/io/IInputStream.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+ namespace activemq
+ {
+ namespace protocol
+ {
+ struct IMarshaller ;
+ }
+ using namespace ifr;
+ using namespace apache::activemq::protocol;
+ using namespace apache::ppr::io;
+
+/*
+ * An OpenWire data structure.
+ */
+struct IDataStructure : Interface
+{
+ virtual unsigned char getDataStructureType() = 0 ;
+ virtual bool isMarshallAware() = 0 ;
+ virtual int marshal(p<IMarshaller> marshaller, int mode, p<IOutputStream> writer) = 0 ;
+ virtual void unmarshal(p<IMarshaller> marshaller, int mode, p<IInputStream> reader) = 0 ;
+} ;
+
+/* namespace */
+ }
+}
+
+#endif /*ActiveMQ_IDataStructure_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ISynchronization.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ISynchronization.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ISynchronization.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/ISynchronization.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_ISynchronization_hpp_
+#define ActiveMQ_ISynchronization_hpp_
+
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+ namespace activemq
+ {
+ using namespace ifr;
+
+/*
+ *
+ */
+struct ISynchronization : Interface
+{
+ // Called before a commit
+ virtual void beforeCommit() = 0 ;
+
+ // Called after a commit
+ virtual void afterCommit() = 0 ;
+
+ // Called after a transaction rollback
+ virtual void afterRollback() = 0 ;
+} ;
+
+/* namespace */
+ }
+}
+
+#endif /*ActiveMQ_ISynchronization_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageAckType.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageAckType.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageAckType.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageAckType.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_MessageAckType_hpp_
+#define ActiveMQ_MessageAckType_hpp_
+
+namespace apache
+{
+ namespace activemq
+ {
+ enum MessageAckType
+ {
+ DeliveredAck = 0,
+ PoisonAck = 1,
+ ConsumedAck = 2
+ } ;
+ }
+}
+
+#endif /*ActiveMQ_MessageAckType_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,307 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/MessageConsumer.hpp"
+#include "activemq/Session.hpp"
+
+using namespace apache::activemq;
+
+/*
+ *
+ */
+MessageConsumer::MessageConsumer(p<Session> session, p<ConsumerInfo> consumerInfo, AcknowledgementMode acknowledgementMode)
+{
+ this->session = session ;
+ this->consumerInfo = consumerInfo ;
+ this->acknowledgementMode = acknowledgementMode ;
+ this->dispatcher = new Dispatcher() ;
+ this->listener = NULL ;
+ this->closed = false ;
+ this->maximumRedeliveryCount = 10 ;
+ this->redeliveryTimeout = 500 ;
+}
+
+/*
+ *
+ */
+MessageConsumer::~MessageConsumer()
+{
+ // Make sure consumer is closed
+ close() ;
+}
+
+// Attribute methods ------------------------------------------------
+
+/*
+ *
+ */
+void MessageConsumer::setMessageListener(p<IMessageListener> listener)
+{
+ this->listener = listener ;
+}
+
+/*
+ *
+ */
+p<IMessageListener> MessageConsumer::getMessageListener()
+{
+ return listener ;
+}
+
+/*
+ *
+ */
+p<ConsumerId> MessageConsumer::getConsumerId()
+{
+ return consumerInfo->getConsumerId() ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::setMaximumRedeliveryCount(int count)
+{
+ this->maximumRedeliveryCount = count ;
+}
+
+/*
+ *
+ */
+int MessageConsumer::getMaximumRedeliveryCount()
+{
+ return maximumRedeliveryCount ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::setRedeliveryTimeout(int timeout)
+{
+ this->redeliveryTimeout = timeout ;
+}
+
+/*
+ *
+ */
+int MessageConsumer::getRedeliveryTimeout()
+{
+ return redeliveryTimeout ;
+}
+
+
+// Operation methods ------------------------------------------------
+
+/*
+ *
+ */
+p<IMessage> MessageConsumer::receive()
+{
+ checkClosed() ;
+ return autoAcknowledge( dispatcher->dequeue() ) ;
+}
+
+/*
+ *
+ */
+p<IMessage> MessageConsumer::receive(int timeout)
+{
+ checkClosed() ;
+ return autoAcknowledge( dispatcher->dequeue(timeout) ) ;
+}
+
+/*
+ *
+ */
+p<IMessage> MessageConsumer::receiveNoWait()
+{
+ checkClosed() ;
+ return autoAcknowledge( dispatcher->dequeueNoWait() ) ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::redeliverRolledBackMessages()
+{
+ dispatcher->redeliverRolledBackMessages() ;
+}
+
+/*
+ * Transport callback that handles messages dispatching
+ */
+void MessageConsumer::dispatch(p<IMessage> message)
+{
+ dispatcher->enqueue(message) ;
+
+ // Activate background dispatch thread if async listener is set up
+ if( listener != NULL )
+ session->dispatch() ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::dispatchAsyncMessages()
+{
+ while( listener != NULL )
+ {
+ p<IMessage> message = dispatcher->dequeueNoWait() ;
+
+ if( message != NULL )
+ {
+ listener->onMessage(message) ;
+
+ // Auto acknowledge message if selected
+ autoAcknowledge(message) ;
+ }
+ else
+ break ;
+ }
+}
+
+/*
+ * IAcknowledger callback method.
+ */
+void MessageConsumer::acknowledge(p<ActiveMQMessage> message)
+{
+ doClientAcknowledge(message) ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::close()
+{
+ if( !closed )
+ {
+ closed = true ;
+
+ // De-register consumer from broker
+ session->getConnection()->disposeOf( consumerInfo->getConsumerId() ) ;
+
+ // Reset internal state (prevent cyclic references)
+ session = NULL ;
+ }
+}
+
+
+// Implementation methods ------------------------------------------------
+
+/*
+ *
+ */
+void MessageConsumer::checkClosed() throw(CmsException)
+{
+ if( closed )
+ throw ConnectionClosedException("Oops! Connection already closed") ;
+}
+
+/*
+ *
+ */
+p<IMessage> MessageConsumer::autoAcknowledge(p<IMessage> message)
+{
+ try
+ {
+ // Is the message an ActiveMQMessage? (throws bad_cast otherwise)
+ p<ActiveMQMessage> activeMessage = p_dyncast<ActiveMQMessage> (message) ;
+
+ // Register the handler for client acknowledgment
+ activeMessage->setAcknowledger( smartify(this) ) ;
+
+ if( acknowledgementMode != ClientAckMode )
+ doAcknowledge(activeMessage) ;
+ }
+ catch( bad_cast& bc )
+ {
+ // ignore
+ }
+ return message ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::doClientAcknowledge(p<ActiveMQMessage> message)
+{
+ if( acknowledgementMode == ClientAckMode )
+ doAcknowledge(message);
+}
+
+/*
+ *
+ */
+void MessageConsumer::doAcknowledge(p<Message> message)
+{
+ p<MessageAck> ack = createMessageAck(message) ;
+ //cout << "Sending Ack: " << ack->getAckType() << endl ;
+ session->getConnection()->syncRequest(ack) ;
+}
+
+/*
+ *
+ */
+p<MessageAck> MessageConsumer::createMessageAck(p<Message> message)
+{
+ p<MessageAck> ack = new MessageAck() ;
+
+ // Set ack properties
+ ack->setAckType( ConsumedAck ) ;
+ ack->setConsumerId( consumerInfo->getConsumerId() ) ;
+ ack->setDestination( message->getDestination() ) ;
+ ack->setFirstMessageId( message->getMessageId() ) ;
+ ack->setLastMessageId( message->getMessageId() ) ;
+ ack->setMessageCount( 1 ) ;
+
+ if( session->isTransacted() )
+ {
+ session->doStartTransaction() ;
+ ack->setTransactionId( session->getTransactionContext()->getTransactionId() ) ;
+ session->getTransactionContext()->addSynchronization( new MessageConsumerSynchronization(smartify(this), message) ) ;
+ }
+ return ack ;
+}
+
+/*
+ *
+ */
+void MessageConsumer::afterRollback(p<ActiveMQMessage> message)
+{
+ // Try redeliver of the message again
+ message->setRedeliveryCounter( message->getRedeliveryCounter() + 1 ) ;
+
+ // Check if redeliver count has exceeded maximum
+ if( message->getRedeliveryCounter() > maximumRedeliveryCount )
+ {
+ // Send back a poisoned pill
+ p<MessageAck> ack = new MessageAck() ;
+ ack->setAckType( PoisonAck ) ;
+ ack->setConsumerId( consumerInfo->getConsumerId() ) ;
+ ack->setDestination( message->getDestination() ) ;
+ ack->setFirstMessageId( message->getMessageId() ) ;
+ ack->setLastMessageId( message->getMessageId() ) ;
+ ack->setMessageCount( 1 ) ;
+ session->getConnection()->oneway(ack) ;
+ }
+ else
+ {
+ dispatcher->redeliver(message) ;
+
+ // Re-dispatch the message at some point in the future
+ if( listener != NULL )
+ session->dispatch( redeliveryTimeout ) ;
+ }
+}
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumer.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_MessageConsumer_hpp_
+#define ActiveMQ_MessageConsumer_hpp_
+
+#include <string>
+#include "cms/IDestination.hpp"
+#include "cms/IMessage.hpp"
+#include "cms/IMessageConsumer.hpp"
+#include "cms/IMessageListener.hpp"
+#include "cms/CmsException.hpp"
+#include "activemq/AcknowledgementMode.hpp"
+#include "activemq/MessageAckType.hpp"
+#include "activemq/Dispatcher.hpp"
+#include "activemq/IAcknowledger.hpp"
+#include "activemq/MessageConsumerSynchronization.hpp"
+#include "activemq/ConnectionClosedException.hpp"
+#include "activemq/command/ConsumerInfo.hpp"
+#include "activemq/command/Message.hpp"
+#include "activemq/command/ActiveMQMessage.hpp"
+#include "activemq/command/MessageAck.hpp"
+#include "ppr/util/ifr/p"
+#include "ppr/thread/Thread.hpp"
+
+// Turn off warning message for ignored exception specification
+#ifdef _MSC_VER
+#pragma warning( disable : 4290 )
+#endif
+
+namespace apache
+{
+ namespace activemq
+ {
+ using namespace ifr;
+ using namespace apache::cms;
+ using namespace apache::ppr::thread;
+ class Session ;
+
+/*
+ *
+ */
+class MessageConsumer : public IMessageConsumer, public IAcknowledger
+{
+private:
+ p<Session> session ;
+ p<ConsumerInfo> consumerInfo ;
+ p<Dispatcher> dispatcher ;
+ p<IMessageListener> listener ;
+ AcknowledgementMode acknowledgementMode ;
+ bool closed ;
+ int maximumRedeliveryCount,
+ redeliveryTimeout ;
+
+public:
+ MessageConsumer(p<Session> session, p<ConsumerInfo> consumerInfo, AcknowledgementMode acknowledgementMode) ;
+ virtual ~MessageConsumer() ;
+
+ virtual void setMessageListener(p<IMessageListener> listener) ;
+ virtual p<IMessageListener> getMessageListener() ;
+ virtual p<ConsumerId> getConsumerId() ;
+ virtual void setMaximumRedeliveryCount(int count) ;
+ virtual int getMaximumRedeliveryCount() ;
+ virtual void setRedeliveryTimeout(int timeout) ;
+ virtual int getRedeliveryTimeout() ;
+
+ virtual p<IMessage> receive() ;
+ virtual p<IMessage> receive(int timeout) ;
+ virtual p<IMessage> receiveNoWait() ;
+ virtual void redeliverRolledBackMessages() ;
+ virtual void dispatch(p<IMessage> message) ;
+ virtual void dispatchAsyncMessages() ;
+ virtual void afterRollback(p<ActiveMQMessage> message) ;
+ virtual void acknowledge(p<ActiveMQMessage> message) ;
+ virtual void close() ;
+
+protected:
+ void checkClosed() throw(CmsException) ;
+ p<IMessage> autoAcknowledge(p<IMessage> message) ;
+ void doClientAcknowledge(p<ActiveMQMessage> message) ;
+ void doAcknowledge(p<Message> message) ;
+ p<MessageAck> createMessageAck(p<Message> message) ;
+} ;
+
+/* namespace */
+ }
+}
+
+#endif /*ActiveMQ_IMessageConsumer_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/MessageConsumerSynchronization.hpp"
+#include "activemq/MessageConsumer.hpp"
+
+using namespace apache::activemq;
+
+/*
+ *
+ */
+MessageConsumerSynchronization::MessageConsumerSynchronization(p<MessageConsumer> consumer, p<Message> message)
+{
+ this->consumer = consumer ;
+ this->message = message ;
+}
+
+/*
+ *
+ */
+MessageConsumerSynchronization::~MessageConsumerSynchronization()
+{
+}
+
+/*
+ *
+ */
+void MessageConsumerSynchronization::beforeCommit()
+{
+ // no-op
+}
+
+/*
+ *
+ */
+void MessageConsumerSynchronization::afterCommit()
+{
+ // no-op
+}
+
+/*
+ *
+ */
+void MessageConsumerSynchronization::afterRollback()
+{
+ consumer->afterRollback( p_cast<ActiveMQMessage> (message)) ;
+}
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageConsumerSynchronization.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_MessageConsumerSynchronization_hpp_
+#define ActiveMQ_MessageConsumerSynchronization_hpp_
+
+#include "activemq/ISynchronization.hpp"
+#include "activemq/command/ActiveMQMessage.hpp"
+#include "activemq/command/Message.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+ namespace activemq
+ {
+ using namespace ifr;
+ class MessageConsumer;
+
+/*
+ *
+ */
+class MessageConsumerSynchronization : public ISynchronization
+{
+private:
+ p<MessageConsumer> consumer ;
+ p<Message> message ;
+
+public:
+ MessageConsumerSynchronization(p<MessageConsumer> consumer, p<Message> message) ;
+ ~MessageConsumerSynchronization() ;
+
+ virtual void beforeCommit() ;
+ virtual void afterCommit() ;
+ virtual void afterRollback() ;
+} ;
+
+/* namespace */
+ }
+}
+
+#endif /*ActiveMQ_MessageConsumerSynchronization_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/MessageProducer.hpp"
+#include "activemq/Session.hpp"
+
+using namespace apache::activemq;
+
+
+// Constructors -----------------------------------------------------
+
+/*
+ *
+ */
+MessageProducer::MessageProducer(p<Session> session, p<ProducerInfo> producerInfo)
+{
+ this->session = session ;
+ this->producerInfo = producerInfo ;
+ this->priority = DEFAULT_PRIORITY ;
+ this->timeToLive = DEFAULT_TIMETOLIVE ;
+ this->messageCounter = 0 ;
+ this->persistent = false ;
+ this->disableMessageID = false ;
+ this->disableMessageTimestamp = false ;
+ this->closed = false ;
+}
+
+/*
+ *
+ */
+MessageProducer::~MessageProducer()
+{
+ // Make sure the producer is closed
+ close() ;
+}
+
+
+// Attribute methods ------------------------------------------------
+
+bool MessageProducer::getPersistent()
+{
+ return persistent ;
+}
+
+void MessageProducer::setPersistent(bool persistent)
+{
+ this->persistent = persistent ;
+}
+
+long long MessageProducer::getTimeToLive()
+{
+ return timeToLive ;
+}
+
+void MessageProducer::getTimeToLive(long long ttl)
+{
+ this->timeToLive = ttl ;
+}
+
+int MessageProducer::getPriority()
+{
+ return priority ;
+}
+
+void MessageProducer::getPriority(int priority)
+{
+ this->priority = priority ;
+}
+
+bool MessageProducer::getDisableMessageID()
+{
+ return disableMessageID ;
+}
+
+void MessageProducer::getDisableMessageID(bool disable)
+{
+ this->disableMessageID = disable ;
+}
+
+bool MessageProducer::getDisableMessageTimestamp()
+{
+ return disableMessageTimestamp ;
+}
+
+void MessageProducer::getDisableMessageTimestamp(bool disable)
+{
+ this->disableMessageTimestamp = disable ;
+}
+
+
+// Operation methods ------------------------------------------------
+
+/*
+ *
+ */
+void MessageProducer::send(p<IMessage> message)
+{
+ send(producerInfo->getDestination(), message, DEFAULT_PRIORITY, DEFAULT_TIMETOLIVE) ;
+}
+
+/*
+ *
+ */
+void MessageProducer::send(p<IDestination> destination, p<IMessage> message)
+{
+ send(destination, message, DEFAULT_PRIORITY, DEFAULT_TIMETOLIVE) ;
+}
+
+/*
+ *
+ */
+void MessageProducer::send(p<IDestination> destination, p<IMessage> message, char priority, long long timeToLive)
+{
+ p<MessageId> msgId = new MessageId() ;
+ msgId->setProducerId( producerInfo->getProducerId() ) ;
+
+ // Acquire next sequence id
+ {
+ LOCKED_SCOPE (mutex);
+ msgId->setProducerSequenceId( ++messageCounter ) ;
+ }
+
+ // Configure the message
+ p<ActiveMQMessage> activeMessage = p_dyncast<ActiveMQMessage> (message) ;
+ activeMessage->setMessageId( msgId ) ;
+ activeMessage->setProducerId( producerInfo->getProducerId() ) ;
+ activeMessage->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ;
+ activeMessage->setPriority(priority) ;
+
+ if( session->isTransacted() )
+ {
+ session->doStartTransaction() ;
+ activeMessage->setTransactionId( session->getTransactionContext()->getTransactionId() ) ;
+ }
+
+ // Set time values if not disabled
+ if( !this->disableMessageTimestamp )
+ {
+ long long timestamp = Time::getCurrentTimeMillis() ;
+
+ // Set message time stamp/expiration
+ activeMessage->setTimestamp(timestamp) ;
+ if( timeToLive > 0 )
+ activeMessage->setExpiration( timestamp + timeToLive ) ;
+ }
+
+ // Finally, transmit the message
+ session->doSend(destination, message) ;
+}
+
+/*
+ *
+ */
+void MessageProducer::close()
+{
+ if( !closed )
+ {
+ closed = true ;
+
+ // De-register producer from broker
+ session->getConnection()->disposeOf( producerInfo->getProducerId() ) ;
+
+ // Reset internal state (prevent cyclic references)
+ session = NULL ;
+ }
+}
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.hpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/MessageProducer.hpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ActiveMQ_MessageProducer_hpp_
+#define ActiveMQ_MessageProducer_hpp_
+
+#include <string>
+#include "cms/IDestination.hpp"
+#include "cms/IMessage.hpp"
+#include "cms/IMessageProducer.hpp"
+#include "activemq/command/ProducerInfo.hpp"
+#include "ppr/thread/SimpleMutex.hpp"
+#include "ppr/util/Time.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+ namespace activemq
+ {
+ using namespace ifr;
+ using namespace apache::cms;
+ using namespace apache::activemq::command;
+ using namespace apache::ppr;
+ using namespace apache::ppr::thread;
+ class Session ;
+
+/*
+ *
+ */
+class MessageProducer : public IMessageProducer
+{
+private:
+ p<Session> session ;
+ p<ProducerInfo> producerInfo ;
+ SimpleMutex mutex ;
+ int priority ;
+ long long messageCounter,
+ timeToLive ;
+ bool closed,
+ persistent,
+ disableMessageID,
+ disableMessageTimestamp ;
+
+ // Default message values
+ const static char DEFAULT_PRIORITY = 4 ;
+ const static long long DEFAULT_TIMETOLIVE = 0 ;
+
+public:
+ MessageProducer(p<Session> session, p<ProducerInfo> producerInfo) ;
+ virtual ~MessageProducer() ;
+
+ // Attribute methods
+ virtual bool getPersistent() ;
+ virtual void setPersistent(bool persistent) ;
+ virtual long long getTimeToLive() ;
+ virtual void getTimeToLive(long long ttl) ;
+ virtual int getPriority() ;
+ virtual void getPriority(int priority) ;
+ virtual bool getDisableMessageID() ;
+ virtual void getDisableMessageID(bool disable) ;
+ virtual bool getDisableMessageTimestamp() ;
+ virtual void getDisableMessageTimestamp(bool disable) ;
+
+ // Operation methods
+ virtual void send(p<IMessage> message) ;
+ virtual void send(p<IDestination> destination, p<IMessage> message) ;
+ virtual void send(p<IDestination> destination, p<IMessage> message, char priority, long long timeToLive) ;
+ virtual void close() ;
+
+private:
+ long long getCurrentTimeMillis() ;
+} ;
+
+/* namespace */
+ }
+}
+
+#endif /*ActiveMQ_MessageProducer_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp?rev=397654&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/Session.cpp Thu Apr 27 14:59:28 2006
@@ -0,0 +1,508 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "activemq/Session.hpp"
+#include "activemq/command/ActiveMQDestination.hpp"
+#include "activemq/command/ActiveMQQueue.hpp"
+#include "activemq/command/ActiveMQTopic.hpp"
+#include "activemq/command/ActiveMQTempQueue.hpp"
+#include "activemq/command/ActiveMQTempTopic.hpp"
+#include "activemq/command/ActiveMQMessage.hpp"
+#include "activemq/command/ActiveMQBytesMessage.hpp"
+#include "activemq/command/ActiveMQMapMessage.hpp"
+#include "activemq/command/ActiveMQTextMessage.hpp"
+#include "activemq/command/ProducerInfo.hpp"
+#include "activemq/command/ConsumerInfo.hpp"
+#include "activemq/command/MessageAck.hpp"
+#include "activemq/MessageConsumer.hpp"
+#include "activemq/MessageProducer.hpp"
+#include "activemq/Connection.hpp"
+
+using namespace apache::activemq;
+
+
+// Constructors -----------------------------------------------------
+
+/*
+ *
+ */
+Session::Session(p<Connection> connection, p<SessionInfo> info, AcknowledgementMode ackMode)
+{
+ this->connection = connection ;
+ this->sessionInfo = info ;
+ this->ackMode = ackMode ;
+ this->prefetchSize = 1000 ;
+ this->consumerCounter = 0 ;
+ this->producerCounter = 0 ;
+ this->transactionContext = new TransactionContext(smartify(this)) ;
+ this->dispatchThread = new DispatchThread(smartify(this)) ;
+ this->closed = false ;
+
+ // Activate backround dispatch thread
+ dispatchThread->start() ;
+}
+
+/*
+ *
+ */
+Session::~Session()
+{
+ // Make sure session is closed
+ close() ;
+}
+
+
+// Attribute methods ------------------------------------------------
+
+/*
+ *
+ */
+bool Session::isTransacted()
+{
+ return ( ackMode == TransactionalAckMode ) ? true : false ;
+}
+
+/*
+ *
+ */
+p<Connection> Session::getConnection()
+{
+ return connection ;
+}
+
+/*
+ *
+ */
+p<SessionId> Session::getSessionId()
+{
+ return sessionInfo->getSessionId() ;
+}
+
+/*
+ *
+ */
+p<TransactionContext> Session::getTransactionContext()
+{
+ return transactionContext ;
+}
+
+/*
+ *
+ */
+p<MessageConsumer> Session::getConsumer(p<ConsumerId> consumerId)
+{
+ map<long long, p<MessageConsumer> >::iterator tempIter ;
+
+ // Check if key exists in map
+ tempIter = consumers.find( consumerId->getValue() ) ;
+ if( tempIter == consumers.end() )
+ return NULL ;
+ else
+ return tempIter->second ;
+}
+
+/*
+ *
+ */
+p<MessageProducer> Session::getProducer(p<ProducerId> producerId)
+{
+ map<long long, p<MessageProducer> >::iterator tempIter ;
+
+ // Check if key exists in map
+ tempIter = producers.find( producerId->getValue() ) ;
+ if( tempIter == producers.end() )
+ return NULL ;
+ else
+ return tempIter->second ;
+}
+
+
+// Operation methods ------------------------------------------------
+
+/*
+ *
+ */
+p<IMessageProducer> Session::createProducer()
+{
+ return createProducer(NULL) ;
+}
+
+/*
+ *
+ */
+p<IMessageProducer> Session::createProducer(p<IDestination> destination)
+{
+ p<ProducerInfo> command = createProducerInfo(destination) ;
+ p<ProducerId> producerId = command->getProducerId() ;
+
+ try
+ {
+ p<MessageProducer> producer = new MessageProducer(smartify(this), command) ;
+
+ // Save the producer
+ producers[ producerId->getValue() ] = producer ;
+
+ // Register producer with broker
+ connection->syncRequest(command) ;
+
+ return producer ;
+ }
+ catch( exception e )
+ {
+ // Make sure producer was removed
+ producers[ producerId->getValue() ] = NULL ;
+ throw e ;
+ }
+}
+
+/*
+ *
+ */
+p<IMessageConsumer> Session::createConsumer(p<IDestination> destination)
+{
+ return createConsumer(destination, NULL) ;
+}
+
+/*
+ *
+ */
+p<IMessageConsumer> Session::createConsumer(p<IDestination> destination, const char* selector)
+{
+ p<ConsumerInfo> command = createConsumerInfo(destination, selector) ;
+ p<ConsumerId> consumerId = command->getConsumerId() ;
+
+ try
+ {
+ p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
+
+ // Save the consumer first in case message dispatching starts immediately
+ consumers[ consumerId->getValue() ] = consumer ;
+
+ // Register consumer with broker
+ connection->syncRequest(command) ;
+
+ return consumer ;
+ }
+ catch( exception e )
+ {
+ // Make sure consumer was removed
+ consumers[ consumerId->getValue() ] = NULL ;
+ throw e ;
+ }
+}
+
+p<IMessageConsumer> Session::createDurableConsumer(p<ITopic> destination, const char* name, const char* selector, bool noLocal)
+{
+ p<ConsumerInfo> command = createConsumerInfo(destination, selector) ;
+ p<ConsumerId> consumerId = command->getConsumerId() ;
+ p<string> subscriptionName = new string(name) ;
+
+ command->setSubcriptionName( subscriptionName ) ;
+ command->setNoLocal( noLocal ) ;
+
+ try
+ {
+ p<MessageConsumer> consumer = new MessageConsumer(smartify(this), command, ackMode) ;
+
+ // Save the consumer first in case message dispatching starts immediately
+ consumers[ consumerId->getValue() ] = consumer ;
+
+ // Register consumer with broker
+ connection->syncRequest(command) ;
+
+ return consumer ;
+ }
+ catch( exception e )
+ {
+ // Make sure consumer was removed
+ consumers[ consumerId->getValue() ] = NULL ;
+ throw e ;
+ }
+}
+
+/*
+ *
+ */
+p<IQueue> Session::getQueue(const char* name)
+{
+ p<IQueue> queue = new ActiveMQQueue(name) ;
+ return queue ;
+}
+
+/*
+ *
+ */
+p<ITopic> Session::getTopic(const char* name)
+{
+ p<ITopic> topic = new ActiveMQTopic(name) ;
+ return topic ;
+}
+
+/*
+ *
+ */
+p<ITemporaryQueue> Session::createTemporaryQueue()
+{
+ p<ITemporaryQueue> queue = new ActiveMQTempQueue( connection->createTemporaryDestinationName()->c_str() ) ;
+ return queue ;
+}
+
+/*
+ *
+ */
+p<ITemporaryTopic> Session::createTemporaryTopic()
+{
+ p<ITemporaryTopic> topic = new ActiveMQTempTopic( connection->createTemporaryDestinationName()->c_str() ) ;
+ return topic ;
+}
+
+/*
+ *
+ */
+p<IMessage> Session::createMessage()
+{
+ p<IMessage> message = new ActiveMQMessage() ;
+ configure(message) ;
+ return message ;
+}
+
+/*
+ *
+ */
+p<IBytesMessage> Session::createBytesMessage()
+{
+ p<IBytesMessage> message = new ActiveMQBytesMessage() ;
+ configure(message) ;
+ return message ;
+}
+
+/*
+ *
+ */
+p<IBytesMessage> Session::createBytesMessage(char* body, int size)
+{
+ p<IBytesMessage> message = new ActiveMQBytesMessage( body, size ) ;
+ configure(message) ;
+ return message ;
+}
+
+/*
+ *
+ */
+p<IMapMessage> Session::createMapMessage()
+{
+ p<IMapMessage> message = new ActiveMQMapMessage() ;
+ configure(message) ;
+ return message ;
+}
+
+/*
+ *
+ */
+p<ITextMessage> Session::createTextMessage()
+{
+ p<ITextMessage> message = new ActiveMQTextMessage() ;
+ configure(message) ;
+ return message ;
+}
+
+/*
+ *
+ */
+p<ITextMessage> Session::createTextMessage(const char* text)
+{
+ p<ITextMessage> message = new ActiveMQTextMessage(text) ;
+ configure(message) ;
+ return message ;
+}
+
+/*
+ *
+ */
+void Session::commit() throw(CmsException)
+{
+ if( !isTransacted() )
+ throw CmsException("You cannot perform a commit on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
+
+ transactionContext->commit() ;
+}
+
+/*
+ *
+ */
+void Session::rollback() throw(CmsException)
+{
+ if( !isTransacted() )
+ throw CmsException("You cannot perform a rollback on a non-transacted session. Acknowlegement mode is: " + ackMode) ;
+
+ transactionContext->rollback() ;
+
+ map<long long, p<MessageConsumer> >::const_iterator tempIter ;
+
+ // Ensure all the consumers redeliver any rolled back messages
+ for( tempIter = consumers.begin() ;
+ tempIter != consumers.end() ;
+ tempIter++ )
+ {
+ ((*tempIter).second)->redeliverRolledBackMessages() ;
+ }
+}
+
+/*
+ *
+ */
+void Session::doSend(p<IDestination> destination, p<IMessage> message)
+{
+ p<ActiveMQMessage> command = p_dyncast<ActiveMQMessage> (message) ;
+ // TODO complete packet
+ connection->syncRequest(command) ;
+}
+
+/*
+ * Starts a new transaction
+ */
+void Session::doStartTransaction()
+{
+ if( isTransacted() )
+ transactionContext->begin() ;
+}
+
+/*
+ *
+ */
+void Session::dispatch(int delay)
+{
+ if( delay > 0 )
+ dispatchThread->sleep(delay) ;
+
+ dispatchThread->wakeup() ;
+}
+
+/*
+ *
+ */
+void Session::dispatchAsyncMessages()
+{
+ // Ensure that only 1 thread dispatches messages in a consumer at once
+ LOCKED_SCOPE (mutex);
+
+ map<long long, p<MessageConsumer> >::const_iterator tempIter ;
+
+ // Iterate through each consumer created by this session
+ // ensuring that they have all pending messages dispatched
+ for( tempIter = consumers.begin() ;
+ tempIter != consumers.end() ;
+ tempIter++ )
+ {
+ ((*tempIter).second)->dispatchAsyncMessages() ;
+ }
+}
+
+/*
+ *
+ */
+void Session::close()
+{
+ if( !closed )
+ {
+ map<long long, p<MessageConsumer> >::iterator consumerIter ;
+ map<long long, p<MessageProducer> >::iterator producerIter ;
+
+ // Iterate through all consumers and close them down
+ for( consumerIter = consumers.begin() ;
+ consumerIter != consumers.end() ;
+ consumerIter++ )
+ {
+ consumerIter->second->close() ;
+ consumerIter->second = NULL ;
+ }
+
+ // Iterate through all producers and close them down
+ for( producerIter = producers.begin() ;
+ producerIter != producers.end() ;
+ producerIter++ )
+ {
+ producerIter->second->close() ;
+ producerIter->second = NULL ;
+ }
+ // De-register session from broker
+ connection->disposeOf( sessionInfo->getSessionId() ) ;
+
+ // Clean up
+ connection = NULL ;
+ closed = true ;
+ }
+}
+
+
+// Implementation methods ------------------------------------------
+
+/*
+ *
+ */
+p<ConsumerInfo> Session::createConsumerInfo(p<IDestination> destination, const char* selector)
+{
+ p<ConsumerInfo> consumerInfo = new ConsumerInfo() ;
+ p<ConsumerId> consumerId = new ConsumerId() ;
+
+ consumerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
+ consumerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
+
+ {
+ LOCKED_SCOPE (mutex);
+ consumerId->setValue( ++consumerCounter ) ;
+ }
+ p<string> sel = ( selector == NULL ) ? NULL : new string(selector) ;
+
+ // TODO complete packet
+ consumerInfo->setConsumerId( consumerId ) ;
+ consumerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
+ consumerInfo->setSelector( sel ) ;
+ consumerInfo->setPrefetchSize( this->prefetchSize ) ;
+
+ return consumerInfo ;
+}
+
+/*
+ *
+ */
+p<ProducerInfo> Session::createProducerInfo(p<IDestination> destination)
+{
+ p<ProducerInfo> producerInfo = new ProducerInfo() ;
+ p<ProducerId> producerId = new ProducerId() ;
+
+ producerId->setConnectionId( sessionInfo->getSessionId()->getConnectionId() ) ;
+ producerId->setSessionId( sessionInfo->getSessionId()->getValue() ) ;
+
+ {
+ LOCKED_SCOPE (mutex);
+ producerId->setValue( ++producerCounter ) ;
+ }
+
+ // TODO complete packet
+ producerInfo->setProducerId( producerId ) ;
+ producerInfo->setDestination( p_dyncast<ActiveMQDestination> (destination) ) ; //ActiveMQDestination::transform(destination) ) ;
+
+ return producerInfo ;
+}
+
+/*
+ * Configures the message command.
+ */
+void Session::configure(p<IMessage> message)
+{
+ // TODO:
+}