You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/05/04 17:14:43 UTC
svn commit: r771330 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat:
openwire/marshal/PrimitiveMapMarshaller.cpp stomp/marshal/Marshaler.cpp
stomp/marshal/MarshalerHelper.cpp
Author: tabish
Date: Mon May 4 15:14:42 2009
New Revision: 771330
URL: http://svn.apache.org/viewvc?rev=771330&view=rev
Log:
Somewhat functional Stomp WireFormat. Client Ack doesn't seem to work, auto ack works fine. Transactions are untested.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveMapMarshaller.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/Marshaler.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/MarshalerHelper.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveMapMarshaller.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveMapMarshaller.cpp?rev=771330&r1=771329&r2=771330&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveMapMarshaller.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveMapMarshaller.cpp Mon May 4 15:14:42 2009
@@ -160,47 +160,47 @@
try {
- if( value.getValueType() == PrimitiveValueNode::BOOLEAN_TYPE ) {
+ if( value.getType() == PrimitiveValueNode::BOOLEAN_TYPE ) {
dataOut.writeByte( PrimitiveValueNode::BOOLEAN_TYPE );
dataOut.writeBoolean( value.getBool() );
- } else if( value.getValueType() == PrimitiveValueNode::BYTE_TYPE ) {
+ } else if( value.getType() == PrimitiveValueNode::BYTE_TYPE ) {
dataOut.writeByte( PrimitiveValueNode::BYTE_TYPE );
dataOut.writeByte( value.getByte() );
- } else if( value.getValueType() == PrimitiveValueNode::CHAR_TYPE ) {
+ } else if( value.getType() == PrimitiveValueNode::CHAR_TYPE ) {
dataOut.writeByte( PrimitiveValueNode::CHAR_TYPE );
dataOut.writeChar( value.getChar() );
- } else if( value.getValueType() == PrimitiveValueNode::SHORT_TYPE ) {
+ } else if( value.getType() == PrimitiveValueNode::SHORT_TYPE ) {
dataOut.writeByte( PrimitiveValueNode::SHORT_TYPE );
dataOut.writeShort( value.getShort() );
- } else if( value.getValueType() == PrimitiveValueNode::INTEGER_TYPE ) {
+ } else if( value.getType() == PrimitiveValueNode::INTEGER_TYPE ) {
dataOut.writeByte( PrimitiveValueNode::INTEGER_TYPE );
dataOut.writeInt( value.getInt() );
- } else if( value.getValueType() == PrimitiveValueNode::LONG_TYPE ) {
+ } else if( value.getType() == PrimitiveValueNode::LONG_TYPE ) {
dataOut.writeByte( PrimitiveValueNode::LONG_TYPE );
dataOut.writeLong( value.getLong() );
- } else if( value.getValueType() == PrimitiveValueNode::FLOAT_TYPE ) {
+ } else if( value.getType() == PrimitiveValueNode::FLOAT_TYPE ) {
dataOut.writeByte( PrimitiveValueNode::FLOAT_TYPE );
dataOut.writeFloat( value.getFloat() );
- } else if( value.getValueType() == PrimitiveValueNode::DOUBLE_TYPE ) {
+ } else if( value.getType() == PrimitiveValueNode::DOUBLE_TYPE ) {
dataOut.writeByte( PrimitiveValueNode::DOUBLE_TYPE );
dataOut.writeDouble( value.getDouble() );
- } else if( value.getValueType() == PrimitiveValueNode::BYTE_ARRAY_TYPE ) {
+ } else if( value.getType() == PrimitiveValueNode::BYTE_ARRAY_TYPE ) {
dataOut.writeByte( PrimitiveValueNode::BYTE_ARRAY_TYPE );
@@ -209,7 +209,7 @@
dataOut.writeInt( (int)data.size() );
dataOut.write( data );
- } else if( value.getValueType() == PrimitiveValueNode::STRING_TYPE ) {
+ } else if( value.getType() == PrimitiveValueNode::STRING_TYPE ) {
std::string data = value.getString();
@@ -222,12 +222,12 @@
dataOut.writeUTF( data );
}
- } else if( value.getValueType() == PrimitiveValueNode::LIST_TYPE ) {
+ } else if( value.getType() == PrimitiveValueNode::LIST_TYPE ) {
dataOut.writeByte( PrimitiveValueNode::LIST_TYPE );
marshalPrimitiveList( dataOut, value.getList() );
- } else if( value.getValueType() == PrimitiveValueNode::MAP_TYPE ) {
+ } else if( value.getType() == PrimitiveValueNode::MAP_TYPE ) {
dataOut.writeByte( PrimitiveValueNode::MAP_TYPE );
marshalPrimitiveMap( dataOut, value.getMap() );
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/Marshaler.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/Marshaler.cpp?rev=771330&r1=771329&r2=771330&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/Marshaler.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/Marshaler.cpp Mon May 4 15:14:42 2009
@@ -28,7 +28,9 @@
#include <activemq/commands/Response.h>
#include <activemq/commands/BrokerError.h>
#include <activemq/commands/MessageAck.h>
+#include <activemq/commands/MessageDispatch.h>
#include <activemq/commands/ConnectionInfo.h>
+#include <activemq/commands/ConsumerId.h>
#include <activemq/commands/ExceptionResponse.h>
#include <activemq/commands/ShutdownInfo.h>
#include <activemq/commands/RemoveInfo.h>
@@ -41,7 +43,10 @@
#include <decaf/lang/exceptions/ClassCastException.h>
#include <decaf/lang/Boolean.h>
#include <decaf/lang/Integer.h>
+#include <decaf/lang/Long.h>
#include <decaf/io/IOException.h>
+#include <decaf/io/ByteArrayOutputStream.h>
+#include <decaf/io/DataOutputStream.h>
using namespace activemq;
using namespace activemq::core;
@@ -120,27 +125,35 @@
////////////////////////////////////////////////////////////////////////////////
Pointer<Command> Marshaler::unmarshalMessage( const Pointer<StompFrame>& frame ) {
- Pointer<Message> message;
+ Pointer<MessageDispatch> messageDispatch( new MessageDispatch() );
- // Convert from the Frame to standard message properties.
- helper.convertProperties( frame, message );
-
- // Check for Content length, that tells us if its a Text or Bytes Message
- if( frame->removeProperty( StompCommandConstants::HEADER_CONTENTLENGTH ) != "" ) {
- message.reset( new ActiveMQBytesMessage() );
+ // We created a unique id when we registered the subscription for the consumer
+ // now extract it back to a consumer Id so the ActiveMQConnection can dispatch it
+ // correctly.
+ Pointer<ConsumerId> consumerId = helper.convertConsumerId(
+ frame->removeProperty( StompCommandConstants::HEADER_SUBSCRIPTION ) );
+ messageDispatch->setConsumerId( consumerId );
+
+ if( frame->hasProperty( StompCommandConstants::HEADER_CONTENTLENGTH ) ) {
+
+ Pointer<ActiveMQBytesMessage> message( new ActiveMQBytesMessage() );
+ frame->removeProperty( StompCommandConstants::HEADER_CONTENTLENGTH );
+ helper.convertProperties( frame, message );
message->setContent( frame->getBody() );
+ messageDispatch->setMessage( message );
+ messageDispatch->setDestination( message->getDestination() );
+
} else {
- Pointer<ActiveMQTextMessage> txtMessage( new ActiveMQTextMessage() );
- if( frame->getBodyLength() > 0 ) {
- std::string text( (char*)( &(frame->getBody()[0]) ), frame->getBodyLength() );
- txtMessage->setText( text );
- }
+ Pointer<ActiveMQTextMessage> message( new ActiveMQTextMessage() );
+ helper.convertProperties( frame, message );
+ message->setText( (char*)&(frame->getBody()[0]) );
+ messageDispatch->setMessage( message );
+ messageDispatch->setDestination( message->getDestination() );
- message = txtMessage.dynamicCast<Message>();
}
- return Pointer<Command>();
+ return messageDispatch;
}
////////////////////////////////////////////////////////////////////////////////
@@ -201,6 +214,11 @@
Pointer<StompFrame> frame( new StompFrame() );
frame->setCommand( StompCommandConstants::SEND );
+ if( command->isResponseRequired() ) {
+ frame->setProperty( StompCommandConstants::HEADER_RECEIPT_REQUIRED,
+ Integer::toString( command->getCommandId() ) );
+ }
+
// Convert the standard headers to the Stomp Format.
helper.convertProperties( message, frame );
@@ -208,13 +226,15 @@
try{
Pointer<ActiveMQTextMessage> txtMessage = message.dynamicCast<ActiveMQTextMessage>();
std::string text = txtMessage->getText();
- frame->setBody( (unsigned char*)text.c_str(), text.length() );
+ frame->setBody( (unsigned char*)text.c_str(), text.length() + 1 );
return frame;
} catch( ClassCastException& ex ) {}
try{
Pointer<ActiveMQBytesMessage> bytesMessage = message.dynamicCast<ActiveMQBytesMessage>();
frame->setBody( bytesMessage->getBodyBytes(), bytesMessage->getBodyLength() );
+ frame->setProperty( StompCommandConstants::HEADER_CONTENTLENGTH,
+ Long::toString( bytesMessage->getBodyLength() ) );
return frame;
} catch( ClassCastException& ex ) {}
@@ -231,6 +251,12 @@
Pointer<StompFrame> frame( new StompFrame() );
frame->setCommand( StompCommandConstants::ACK );
+
+ if( command->isResponseRequired() ) {
+ frame->setProperty( StompCommandConstants::HEADER_RECEIPT_REQUIRED,
+ Integer::toString( command->getCommandId() ) );
+ }
+
frame->setProperty( StompCommandConstants::HEADER_MESSAGEID,
helper.convertMessageId( ack->getLastMessageId() ) );
@@ -239,7 +265,7 @@
helper.convertTransactionId( ack->getTransactionId() ) );
}
- return frame;
+ return Pointer<StompFrame>(); //frame;
}
////////////////////////////////////////////////////////////////////////////////
@@ -274,6 +300,11 @@
frame->setCommand( StompCommandConstants::COMMIT );
}
+ if( command->isResponseRequired() ) {
+ frame->setProperty( StompCommandConstants::HEADER_RECEIPT_REQUIRED,
+ Integer::toString( command->getCommandId() ) );
+ }
+
frame->setProperty( StompCommandConstants::HEADER_TRANSACTIONID,
helper.convertTransactionId( info->getTransactionId() ) );
@@ -284,7 +315,13 @@
Pointer<StompFrame> Marshaler::marshalShutdownInfo( const Pointer<Command>& command AMQCPP_UNUSED ) {
Pointer<StompFrame> frame( new StompFrame() );
- frame->setCommand( StompCommandConstants::CONNECT );
+ frame->setCommand( StompCommandConstants::DISCONNECT );
+
+ if( command->isResponseRequired() ) {
+ frame->setProperty( StompCommandConstants::HEADER_RECEIPT_REQUIRED,
+ Integer::toString( command->getCommandId() ) );
+ }
+
return frame;
}
@@ -295,6 +332,11 @@
Pointer<StompFrame> frame( new StompFrame() );
frame->setCommand( StompCommandConstants::UNSUBSCRIBE );
+ if( command->isResponseRequired() ) {
+ frame->setProperty( StompCommandConstants::HEADER_RECEIPT_REQUIRED,
+ Integer::toString( command->getCommandId() ) );
+ }
+
try{
Pointer<ConsumerId> id = info->getObjectId().dynamicCast<ConsumerId>();
frame->setProperty( StompCommandConstants::HEADER_ID, helper.convertConsumerId( id ) );
@@ -312,8 +354,17 @@
Pointer<StompFrame> frame( new StompFrame() );
frame->setCommand( StompCommandConstants::SUBSCRIBE );
+ if( command->isResponseRequired() ) {
+ frame->setProperty( StompCommandConstants::HEADER_RECEIPT_REQUIRED,
+ Integer::toString( command->getCommandId() ) );
+ }
+
frame->setProperty( StompCommandConstants::HEADER_DESTINATION,
helper.convertDestination( info->getDestination() ) );
+
+ // This creates a unique Id for this consumer using the connection id, session id and
+ // the consumers's id value, when we get a message this Id will be embedded in the
+ // Message's "subscription" property.
frame->setProperty( StompCommandConstants::HEADER_ID,
helper.convertConsumerId( info->getConsumerId() ) );
@@ -327,7 +378,7 @@
info->getSelector() );
}
- frame->setProperty( StompCommandConstants::HEADER_ACK, "client" );
+ frame->setProperty( StompCommandConstants::HEADER_ACK, "auto" );
if( info->isNoLocal() ) {
frame->setProperty( StompCommandConstants::HEADER_NOLOCAL, "true" );
@@ -361,6 +412,11 @@
Pointer<StompFrame> frame( new StompFrame() );
frame->setCommand( StompCommandConstants::UNSUBSCRIBE );
+ if( command->isResponseRequired() ) {
+ frame->setProperty( StompCommandConstants::HEADER_RECEIPT_REQUIRED,
+ Integer::toString( command->getCommandId() ) );
+ }
+
frame->setProperty( StompCommandConstants::HEADER_ID, info->getClientId() );
frame->setProperty( StompCommandConstants::HEADER_SUBSCRIPTIONNAME,
info->getSubcriptionName() );
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/MarshalerHelper.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/MarshalerHelper.cpp?rev=771330&r1=771329&r2=771330&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/MarshalerHelper.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/MarshalerHelper.cpp Mon May 4 15:14:42 2009
@@ -41,10 +41,12 @@
const std::string destination =
frame->removeProperty( StompCommandConstants::HEADER_DESTINATION );
-
- // Destination creation.
message->setDestination( convertDestination( destination ) );
+ const std::string messageId =
+ frame->removeProperty( StompCommandConstants::HEADER_MESSAGEID );
+ message->setMessageId( convertMessageId( messageId ) );
+
// the standard JMS headers
message->setCorrelationId( StompCommandConstants::HEADER_CORRELATIONID );
@@ -134,8 +136,6 @@
std::vector<std::string>::const_iterator iter = keys.begin();
for( ; iter != keys.end(); ++iter ) {
-
- // TODO - This will fail if the type isn't string.
frame->setProperty( *iter, message->getMessageProperties().getString( *iter ) );
}
}