You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/05/04 17:14:43 UTC

svn commit: r771330 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat: openwire/marshal/PrimitiveMapMarshaller.cpp stomp/marshal/Marshaler.cpp stomp/marshal/MarshalerHelper.cpp

Author: tabish
Date: Mon May  4 15:14:42 2009
New Revision: 771330

URL: http://svn.apache.org/viewvc?rev=771330&view=rev
Log:
Somewhat functional Stomp WireFormat.  Client Ack doesn't seem to work, auto ack works fine.  Transactions are untested.  

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveMapMarshaller.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/Marshaler.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/MarshalerHelper.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveMapMarshaller.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveMapMarshaller.cpp?rev=771330&r1=771329&r2=771330&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveMapMarshaller.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveMapMarshaller.cpp Mon May  4 15:14:42 2009
@@ -160,47 +160,47 @@
 
     try {
 
-        if( value.getValueType() == PrimitiveValueNode::BOOLEAN_TYPE ) {
+        if( value.getType() == PrimitiveValueNode::BOOLEAN_TYPE ) {
 
             dataOut.writeByte( PrimitiveValueNode::BOOLEAN_TYPE );
             dataOut.writeBoolean( value.getBool() );
 
-        } else if( value.getValueType() == PrimitiveValueNode::BYTE_TYPE ) {
+        } else if( value.getType() == PrimitiveValueNode::BYTE_TYPE ) {
 
             dataOut.writeByte( PrimitiveValueNode::BYTE_TYPE );
             dataOut.writeByte( value.getByte() );
 
-        } else if( value.getValueType() == PrimitiveValueNode::CHAR_TYPE ) {
+        } else if( value.getType() == PrimitiveValueNode::CHAR_TYPE ) {
 
             dataOut.writeByte( PrimitiveValueNode::CHAR_TYPE );
             dataOut.writeChar( value.getChar() );
 
-        } else if( value.getValueType() == PrimitiveValueNode::SHORT_TYPE ) {
+        } else if( value.getType() == PrimitiveValueNode::SHORT_TYPE ) {
 
             dataOut.writeByte( PrimitiveValueNode::SHORT_TYPE );
             dataOut.writeShort( value.getShort() );
 
-        } else if( value.getValueType() == PrimitiveValueNode::INTEGER_TYPE ) {
+        } else if( value.getType() == PrimitiveValueNode::INTEGER_TYPE ) {
 
             dataOut.writeByte( PrimitiveValueNode::INTEGER_TYPE );
             dataOut.writeInt( value.getInt() );
 
-        } else if( value.getValueType() == PrimitiveValueNode::LONG_TYPE ) {
+        } else if( value.getType() == PrimitiveValueNode::LONG_TYPE ) {
 
             dataOut.writeByte( PrimitiveValueNode::LONG_TYPE );
             dataOut.writeLong( value.getLong() );
 
-        } else if( value.getValueType() == PrimitiveValueNode::FLOAT_TYPE ) {
+        } else if( value.getType() == PrimitiveValueNode::FLOAT_TYPE ) {
 
             dataOut.writeByte( PrimitiveValueNode::FLOAT_TYPE );
             dataOut.writeFloat( value.getFloat() );
 
-        } else if( value.getValueType() == PrimitiveValueNode::DOUBLE_TYPE ) {
+        } else if( value.getType() == PrimitiveValueNode::DOUBLE_TYPE ) {
 
             dataOut.writeByte( PrimitiveValueNode::DOUBLE_TYPE );
             dataOut.writeDouble( value.getDouble() );
 
-        } else if( value.getValueType() == PrimitiveValueNode::BYTE_ARRAY_TYPE ) {
+        } else if( value.getType() == PrimitiveValueNode::BYTE_ARRAY_TYPE ) {
 
             dataOut.writeByte( PrimitiveValueNode::BYTE_ARRAY_TYPE );
 
@@ -209,7 +209,7 @@
             dataOut.writeInt( (int)data.size() );
             dataOut.write( data );
 
-        } else if( value.getValueType() == PrimitiveValueNode::STRING_TYPE ) {
+        } else if( value.getType() == PrimitiveValueNode::STRING_TYPE ) {
 
             std::string data = value.getString();
 
@@ -222,12 +222,12 @@
                 dataOut.writeUTF( data );
             }
 
-        } else if( value.getValueType() == PrimitiveValueNode::LIST_TYPE ) {
+        } else if( value.getType() == PrimitiveValueNode::LIST_TYPE ) {
 
             dataOut.writeByte( PrimitiveValueNode::LIST_TYPE );
             marshalPrimitiveList( dataOut, value.getList() );
 
-        } else if( value.getValueType() == PrimitiveValueNode::MAP_TYPE ) {
+        } else if( value.getType() == PrimitiveValueNode::MAP_TYPE ) {
 
             dataOut.writeByte( PrimitiveValueNode::MAP_TYPE );
             marshalPrimitiveMap( dataOut, value.getMap() );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/Marshaler.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/Marshaler.cpp?rev=771330&r1=771329&r2=771330&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/Marshaler.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/Marshaler.cpp Mon May  4 15:14:42 2009
@@ -28,7 +28,9 @@
 #include <activemq/commands/Response.h>
 #include <activemq/commands/BrokerError.h>
 #include <activemq/commands/MessageAck.h>
+#include <activemq/commands/MessageDispatch.h>
 #include <activemq/commands/ConnectionInfo.h>
+#include <activemq/commands/ConsumerId.h>
 #include <activemq/commands/ExceptionResponse.h>
 #include <activemq/commands/ShutdownInfo.h>
 #include <activemq/commands/RemoveInfo.h>
@@ -41,7 +43,10 @@
 #include <decaf/lang/exceptions/ClassCastException.h>
 #include <decaf/lang/Boolean.h>
 #include <decaf/lang/Integer.h>
+#include <decaf/lang/Long.h>
 #include <decaf/io/IOException.h>
+#include <decaf/io/ByteArrayOutputStream.h>
+#include <decaf/io/DataOutputStream.h>
 
 using namespace activemq;
 using namespace activemq::core;
@@ -120,27 +125,35 @@
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<Command> Marshaler::unmarshalMessage( const Pointer<StompFrame>& frame ) {
 
-    Pointer<Message> message;
+    Pointer<MessageDispatch> messageDispatch( new MessageDispatch() );
 
-    // Convert from the Frame to standard message properties.
-    helper.convertProperties( frame, message );
-
-    // Check for Content length, that tells us if its a Text or Bytes Message
-    if( frame->removeProperty( StompCommandConstants::HEADER_CONTENTLENGTH ) != "" ) {
-        message.reset( new ActiveMQBytesMessage() );
+    // We created a unique id when we registered the subscription for the consumer
+    // now extract it back to a consumer Id so the ActiveMQConnection can dispatch it
+    // correctly.
+    Pointer<ConsumerId> consumerId = helper.convertConsumerId(
+        frame->removeProperty( StompCommandConstants::HEADER_SUBSCRIPTION ) );
+    messageDispatch->setConsumerId( consumerId );
+
+    if( frame->hasProperty( StompCommandConstants::HEADER_CONTENTLENGTH ) ) {
+
+        Pointer<ActiveMQBytesMessage> message( new ActiveMQBytesMessage() );
+        frame->removeProperty( StompCommandConstants::HEADER_CONTENTLENGTH );
+        helper.convertProperties( frame, message );
         message->setContent( frame->getBody() );
+        messageDispatch->setMessage( message );
+        messageDispatch->setDestination( message->getDestination() );
+
     } else {
-        Pointer<ActiveMQTextMessage> txtMessage( new ActiveMQTextMessage() );
 
-        if( frame->getBodyLength() > 0 ) {
-            std::string text( (char*)( &(frame->getBody()[0]) ), frame->getBodyLength() );
-            txtMessage->setText( text );
-        }
+        Pointer<ActiveMQTextMessage> message( new ActiveMQTextMessage() );
+        helper.convertProperties( frame, message );
+        message->setText( (char*)&(frame->getBody()[0]) );
+        messageDispatch->setMessage( message );
+        messageDispatch->setDestination( message->getDestination() );
 
-        message = txtMessage.dynamicCast<Message>();
     }
 
-    return Pointer<Command>();
+    return messageDispatch;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -201,6 +214,11 @@
     Pointer<StompFrame> frame( new StompFrame() );
     frame->setCommand( StompCommandConstants::SEND );
 
+    if( command->isResponseRequired() ) {
+        frame->setProperty( StompCommandConstants::HEADER_RECEIPT_REQUIRED,
+                            Integer::toString( command->getCommandId() ) );
+    }
+
     // Convert the standard headers to the Stomp Format.
     helper.convertProperties( message, frame );
 
@@ -208,13 +226,15 @@
     try{
         Pointer<ActiveMQTextMessage> txtMessage = message.dynamicCast<ActiveMQTextMessage>();
         std::string text = txtMessage->getText();
-        frame->setBody( (unsigned char*)text.c_str(), text.length() );
+        frame->setBody( (unsigned char*)text.c_str(), text.length() + 1 );
         return frame;
     } catch( ClassCastException& ex ) {}
 
     try{
         Pointer<ActiveMQBytesMessage> bytesMessage = message.dynamicCast<ActiveMQBytesMessage>();
         frame->setBody( bytesMessage->getBodyBytes(), bytesMessage->getBodyLength() );
+        frame->setProperty( StompCommandConstants::HEADER_CONTENTLENGTH,
+                            Long::toString( bytesMessage->getBodyLength() ) );
         return frame;
     } catch( ClassCastException& ex ) {}
 
@@ -231,6 +251,12 @@
 
     Pointer<StompFrame> frame( new StompFrame() );
     frame->setCommand( StompCommandConstants::ACK );
+
+    if( command->isResponseRequired() ) {
+        frame->setProperty( StompCommandConstants::HEADER_RECEIPT_REQUIRED,
+                            Integer::toString( command->getCommandId() ) );
+    }
+
     frame->setProperty( StompCommandConstants::HEADER_MESSAGEID,
                         helper.convertMessageId( ack->getLastMessageId() ) );
 
@@ -239,7 +265,7 @@
                             helper.convertTransactionId( ack->getTransactionId() ) );
     }
 
-    return frame;
+    return Pointer<StompFrame>(); //frame;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -274,6 +300,11 @@
         frame->setCommand( StompCommandConstants::COMMIT );
     }
 
+    if( command->isResponseRequired() ) {
+        frame->setProperty( StompCommandConstants::HEADER_RECEIPT_REQUIRED,
+                            Integer::toString( command->getCommandId() ) );
+    }
+
     frame->setProperty( StompCommandConstants::HEADER_TRANSACTIONID,
                         helper.convertTransactionId( info->getTransactionId() ) );
 
@@ -284,7 +315,13 @@
 Pointer<StompFrame> Marshaler::marshalShutdownInfo( const Pointer<Command>& command AMQCPP_UNUSED ) {
 
     Pointer<StompFrame> frame( new StompFrame() );
-    frame->setCommand( StompCommandConstants::CONNECT );
+    frame->setCommand( StompCommandConstants::DISCONNECT );
+
+    if( command->isResponseRequired() ) {
+        frame->setProperty( StompCommandConstants::HEADER_RECEIPT_REQUIRED,
+                            Integer::toString( command->getCommandId() ) );
+    }
+
     return frame;
 }
 
@@ -295,6 +332,11 @@
     Pointer<StompFrame> frame( new StompFrame() );
     frame->setCommand( StompCommandConstants::UNSUBSCRIBE );
 
+    if( command->isResponseRequired() ) {
+        frame->setProperty( StompCommandConstants::HEADER_RECEIPT_REQUIRED,
+                            Integer::toString( command->getCommandId() ) );
+    }
+
     try{
         Pointer<ConsumerId> id = info->getObjectId().dynamicCast<ConsumerId>();
         frame->setProperty( StompCommandConstants::HEADER_ID, helper.convertConsumerId( id ) );
@@ -312,8 +354,17 @@
     Pointer<StompFrame> frame( new StompFrame() );
     frame->setCommand( StompCommandConstants::SUBSCRIBE );
 
+    if( command->isResponseRequired() ) {
+        frame->setProperty( StompCommandConstants::HEADER_RECEIPT_REQUIRED,
+                            Integer::toString( command->getCommandId() ) );
+    }
+
     frame->setProperty( StompCommandConstants::HEADER_DESTINATION,
                         helper.convertDestination( info->getDestination() ) );
+
+    // This creates a unique Id for this consumer using the connection id, session id and
+    // the consumers's id value, when we get a message this Id will be embedded in the
+    // Message's "subscription" property.
     frame->setProperty( StompCommandConstants::HEADER_ID,
                         helper.convertConsumerId( info->getConsumerId() ) );
 
@@ -327,7 +378,7 @@
                             info->getSelector() );
     }
 
-    frame->setProperty( StompCommandConstants::HEADER_ACK, "client" );
+    frame->setProperty( StompCommandConstants::HEADER_ACK, "auto" );
 
     if( info->isNoLocal() ) {
         frame->setProperty( StompCommandConstants::HEADER_NOLOCAL, "true" );
@@ -361,6 +412,11 @@
     Pointer<StompFrame> frame( new StompFrame() );
     frame->setCommand( StompCommandConstants::UNSUBSCRIBE );
 
+    if( command->isResponseRequired() ) {
+        frame->setProperty( StompCommandConstants::HEADER_RECEIPT_REQUIRED,
+                            Integer::toString( command->getCommandId() ) );
+    }
+
     frame->setProperty( StompCommandConstants::HEADER_ID, info->getClientId() );
     frame->setProperty( StompCommandConstants::HEADER_SUBSCRIPTIONNAME,
                         info->getSubcriptionName() );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/MarshalerHelper.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/MarshalerHelper.cpp?rev=771330&r1=771329&r2=771330&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/MarshalerHelper.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/stomp/marshal/MarshalerHelper.cpp Mon May  4 15:14:42 2009
@@ -41,10 +41,12 @@
 
     const std::string destination =
         frame->removeProperty( StompCommandConstants::HEADER_DESTINATION );
-
-    // Destination creation.
     message->setDestination( convertDestination( destination ) );
 
+    const std::string messageId =
+        frame->removeProperty( StompCommandConstants::HEADER_MESSAGEID );
+    message->setMessageId( convertMessageId( messageId ) );
+
     // the standard JMS headers
     message->setCorrelationId( StompCommandConstants::HEADER_CORRELATIONID );
 
@@ -134,8 +136,6 @@
     std::vector<std::string>::const_iterator iter = keys.begin();
 
     for( ; iter != keys.end(); ++iter ) {
-
-        // TODO - This will fail if the type isn't string.
         frame->setProperty( *iter, message->getMessageProperties().getString( *iter ) );
     }
 }