You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/06/18 21:06:59 UTC
svn commit: r548448 - in /activemq/activemq-cpp/trunk/src/main/activemq:
connector/openwire/ connector/stomp/ transport/
Author: tabish
Date: Mon Jun 18 12:06:58 2007
New Revision: 548448
URL: http://svn.apache.org/viewvc?view=rev&rev=548448
Log:
https://issues.apache.org/activemq/browse/AMQCPP-130
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.cpp
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.cpp
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.cpp?view=diff&rev=548448&r1=548447&r2=548448
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.cpp Mon Jun 18 12:06:58 2007
@@ -74,20 +74,18 @@
}
////////////////////////////////////////////////////////////////////////////////
-Command* OpenWireResponseBuilder::buildIncomingCommand(
- const transport::Command* command ){
+void OpenWireResponseBuilder::buildIncomingCommands(
+ const transport::Command* command, util::Queue<transport::Command*>& queue ){
// Delegate this to buildResponse
if( command->isResponseRequired() ) {
- return buildResponse( command );
+ queue.push( buildResponse( command ) );
}
if( typeid( *command ) == typeid( commands::WireFormatInfo ) ) {
// Return a copy of the callers own requested WireFormatInfo
// so they get exactly the settings they asked for.
- return command->cloneCommand();
+ queue.push( command->cloneCommand() );
}
-
- return NULL;
}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h?view=diff&rev=548448&r1=548447&r2=548448
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h Mon Jun 18 12:06:58 2007
@@ -38,8 +38,9 @@
virtual ~OpenWireResponseBuilder(){}
- virtual transport::Response* buildResponse( const transport::Command* cmd );
- virtual transport::Command* buildIncomingCommand( const transport::Command* cmd );
+ virtual transport::Response* buildResponse( const transport::Command* command );
+ virtual void buildIncomingCommands(
+ const transport::Command* command, util::Queue<transport::Command*>& queue );
};
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.cpp?view=diff&rev=548448&r1=548447&r2=548448
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.cpp Mon Jun 18 12:06:58 2007
@@ -40,26 +40,22 @@
}
////////////////////////////////////////////////////////////////////////////////
-Command* StompResponseBuilder::buildIncomingCommand( const transport::Command* cmd ){
+void StompResponseBuilder::buildIncomingCommands(
+ const transport::Command* command, util::Queue<transport::Command*>& queue ){
const commands::ConnectCommand* connectCommand =
- dynamic_cast<const commands::ConnectCommand*>(cmd);
+ dynamic_cast<const commands::ConnectCommand*>( command );
- if( connectCommand != NULL ){
+ if( connectCommand != NULL ) {
commands::ConnectedCommand* resp = new commands::ConnectedCommand();
resp->setCorrelationId( connectCommand->getCommandId() );
- if( connectCommand->getClientId() == NULL )
- {
+ if( connectCommand->getClientId() == NULL ) {
resp->setSessionId( sessionId );
- }
- else
- {
+ } else {
resp->setSessionId( connectCommand->getClientId() );
}
- return resp;
+ queue.push( resp );
}
-
- return NULL;
}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h?view=diff&rev=548448&r1=548447&r2=548448
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h Mon Jun 18 12:06:58 2007
@@ -39,7 +39,8 @@
virtual ~StompResponseBuilder(){}
virtual transport::Response* buildResponse( const transport::Command* cmd );
- virtual transport::Command* buildIncomingCommand( const transport::Command* cmd );
+ virtual void buildIncomingCommands(
+ const transport::Command* cmd, util::Queue<transport::Command*>& queue );
};
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h?view=diff&rev=548448&r1=548447&r2=548448
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h Mon Jun 18 12:06:58 2007
@@ -27,6 +27,7 @@
#include <activemq/concurrent/Mutex.h>
#include <activemq/concurrent/Thread.h>
#include <activemq/util/Config.h>
+#include <activemq/util/Queue.h>
#include <activemq/concurrent/CountDownLatch.h>
namespace activemq{
@@ -55,8 +56,23 @@
public:
virtual ~ResponseBuilder(){}
- virtual Response* buildResponse( const Command* cmd ) = 0;
- virtual Command* buildIncomingCommand( const Command* cmd ) = 0;
+ /**
+ * Given a Command, check if it requires a response and return the
+ * appropriate Response that the Broker would send for this Command
+ * @param command - The command to build a response for
+ * @return A Reponse object pointer, or NULL if no response.
+ */
+ virtual Response* buildResponse( const Command* command ) = 0;
+
+ /**
+ * When called the ResponseBuilder must construct all the
+ * Responses or Asynchronous commands that would be sent to
+ * this client by the Broker upon receipt of the passed command.
+ * @param command - The Command being sent to the Broker.
+ * @param queue - Queue of Command sent back from the broker.
+ */
+ virtual void buildIncomingCommands(
+ const Command* cmd, util::Queue<Command*>& queue ) = 0;
};
/**
@@ -74,17 +90,13 @@
MockTransport* transport;
ResponseBuilder* responseBuilder;
- concurrent::Mutex mutex;
- Command* command;
- Command* response;
bool done;
concurrent::CountDownLatch startedLatch;
+ util::Queue<Command*> inboundQueue;
public:
InternalCommandListener(void) : startedLatch(1) {
- command = NULL;
- response = NULL;
transport = NULL;
responseBuilder = NULL;
done = false;
@@ -95,13 +107,14 @@
virtual ~InternalCommandListener() {
done = true;
- synchronized( &mutex )
- {
- mutex.notifyAll();
+ synchronized( &inboundQueue ) {
+ inboundQueue.notifyAll();
}
this->join();
- delete response;
+ while( !inboundQueue.empty() ) {
+ delete inboundQueue.pop();
+ }
}
void setTransport( MockTransport* transport ){
@@ -112,44 +125,38 @@
this->responseBuilder = responseBuilder;
}
- virtual void onCommand( Command* command )
- {
- synchronized( &mutex )
+ virtual void onCommand( Command* command ) {
+ synchronized( &inboundQueue )
{
- this->command = command;
// Create a response now before the caller has a
// chance to destroy the command.
- this->response =
- responseBuilder->buildIncomingCommand( command );
+ responseBuilder->buildIncomingCommands( command, inboundQueue );
- mutex.notifyAll();
+ // Wake up the thread, messages are dispatched from there.
+ inboundQueue.notifyAll();
}
}
- void run(void)
- {
- try
- {
- synchronized( &mutex )
- {
- while( !done )
- {
+ void run(void) {
+ try {
+
+ synchronized( &inboundQueue ) {
+
+ while( !done ) {
startedLatch.countDown();
- mutex.wait();
- if( command == NULL )
- {
+ while( inboundQueue.empty() && !done ){
+ inboundQueue.wait();
+ }
+
+ if( done || transport == NULL ) {
continue;
}
// If we created a response then send it.
- if( response != NULL && transport != NULL )
- {
- transport->fireCommand( this->response );
+ while( !inboundQueue.empty() ) {
+ transport->fireCommand( inboundQueue.pop() );
}
-
- this->response = NULL;
- this->command = NULL;
}
}
}