You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/03/06 12:44:39 UTC

svn commit: r634229 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/framing/ cpp/xml/ python/ python/tests_0-10/

Author: gsim
Date: Thu Mar  6 03:44:36 2008
New Revision: 634229

URL: http://svn.apache.org/viewvc?rev=634229&view=rev
Log:
Fix message delivery for 0-10 final codepath
Convert two more python tests to use 0-10 client


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
    incubator/qpid/trunk/qpid/cpp/xml/extra.xml
    incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
    incubator/qpid/trunk/qpid/python/tests_0-10/broker.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Thu Mar  6 03:44:36 2008
@@ -37,29 +37,30 @@
 
 void MessageBuilder::handle(AMQFrame& frame)
 {
+    uint8_t type = frame.getBody()->type();
     switch(state) {
     case METHOD:
-        checkType(METHOD_BODY, frame.getBody()->type());
+        checkType(METHOD_BODY, type);
         state = HEADER;
         break;
     case HEADER:
-        switch (frame.getBody()->type()) {
-        case CONTENT_BODY:
-            //TODO: rethink how to handle non-existent headers...
+        if (type == CONTENT_BODY) {
+            //TODO: rethink how to handle non-existent headers(?)...
             //didn't get a header: add in a dummy
-            message->getFrames().append(AMQFrame(AMQHeaderBody()));            
-            break;
-        case HEADER_BODY:            
-            break;
-        default:
+            AMQFrame header;
+            header.setBody(AMQHeaderBody());
+            header.setBof(false);
+            header.setEof(false);
+            message->getFrames().append(header);            
+        } else if (type != HEADER_BODY) {
             throw CommandInvalidException(
                 QPID_MSG("Invalid frame sequence for message, expected header or content got " 
-                         << type_str(frame.getBody()->type()) << ")"));
+                         << type_str(type) << ")"));
         }
         state = CONTENT;
         break;
     case CONTENT:
-        checkType(CONTENT_BODY, frame.getBody()->type());
+        checkType(CONTENT_BODY, type);
         break;
     default:
         throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")"));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp Thu Mar  6 03:44:36 2008
@@ -82,9 +82,10 @@
     const std::string destination;
     const u_int8_t confirmMode;
     const u_int8_t acquireMode;
+    const bool isPreview;
 
-    MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) : 
-        destination(d), confirmMode(c), acquireMode(a) {}
+    MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a, bool p) : 
+        destination(d), confirmMode(c), acquireMode(a), isPreview(p) {}
 
     AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId /*id*/)
     {
@@ -92,9 +93,14 @@
         if (msg->getRedelivered()){
             msg->getProperties<DeliveryProperties>()->setRedelivered(true);
         }
-        return AMQFrame(in_place<MessageTransferBody>(
-                            ProtocolVersion(), 0, destination,
-                            confirmMode, acquireMode));
+        if (isPreview) {
+            return AMQFrame(in_place<MessageTransferBody>(
+                ProtocolVersion(), 0, destination,
+                confirmMode, acquireMode));
+        } else {
+            return AMQFrame(in_place<Message010TransferBody>(
+                ProtocolVersion(), destination, confirmMode, acquireMode));
+        }
     }
 };
 
@@ -114,7 +120,13 @@
 DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::string& destination, 
                                                                   u_int8_t confirmMode, u_int8_t acquireMode)
 {
-    return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode));
+    return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, false));
+}
+
+DeliveryToken::shared_ptr MessageDelivery::getPreviewMessageDeliveryToken(const std::string& destination, 
+                                                                  u_int8_t confirmMode, u_int8_t acquireMode)
+{
+    return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, true));
 }
 
 void MessageDelivery::deliver(QueuedMessage& msg, 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h Thu Mar  6 03:44:36 2008
@@ -40,6 +40,9 @@
 public:
     static boost::shared_ptr<DeliveryToken> getBasicGetToken(boost::shared_ptr<Queue> queue);
     static boost::shared_ptr<DeliveryToken> getBasicConsumeToken(const std::string& consumer);
+    static boost::shared_ptr<DeliveryToken> getPreviewMessageDeliveryToken(const std::string& destination, 
+                                                                           u_int8_t confirmMode, 
+                                                                           u_int8_t acquireMode);
     static boost::shared_ptr<DeliveryToken> getMessageDeliveryToken(const std::string& destination, 
                                                                     u_int8_t confirmMode, 
                                                                     u_int8_t acquireMode);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Thu Mar  6 03:44:36 2008
@@ -137,7 +137,7 @@
         throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
 
     string tag = destination;
-    state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), 
+    state.consume(MessageDelivery::getPreviewMessageDeliveryToken(destination, confirmMode, acquireMode), 
                     tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Mar  6 03:44:36 2008
@@ -32,6 +32,8 @@
 #include "TxAck.h"
 #include "TxPublish.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/Message010TransferBody.h"
 #include "qpid/log/Statement.h"
 #include "qpid/ptr_map.h"
 
@@ -344,8 +346,12 @@
 }
 
 void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
-    std::string exchangeName = msg->getExchangeName();      
-    msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
+    std::string exchangeName = msg->getExchangeName();
+    if (msg->isA<MessageTransferBody>()) {
+        msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
+    } else if (msg->isA<Message010TransferBody>()) {
+        msg->getProperties<DeliveryProperties010>()->setExchange(exchangeName);
+    }
     if (!cacheExchange || cacheExchange->getName() != exchangeName){
         cacheExchange = session.getBroker().getExchanges().get(exchangeName);
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h Thu Mar  6 03:44:36 2008
@@ -26,6 +26,8 @@
 #include "Buffer.h"
 #include "qpid/framing/DeliveryProperties.h"
 #include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/DeliveryProperties010.h"
+#include "qpid/framing/MessageProperties010.h"
 #include <iostream>
 
 #include <boost/optional.hpp>
@@ -75,8 +77,10 @@
     };
 
     // Could use boost::mpl::fold to construct a larger set.
-    typedef PropSet<PropSet<Empty, DeliveryProperties>,
-                    MessageProperties> Properties;
+    typedef  PropSet< PropSet< PropSet<PropSet<Empty, DeliveryProperties>, 
+                                       MessageProperties>, 
+                               DeliveryProperties010>, 
+                      MessageProperties010> Properties;
 
     Properties properties;
     

Modified: incubator/qpid/trunk/qpid/cpp/xml/extra.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/extra.xml?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/extra.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/extra.xml Thu Mar  6 03:44:36 2008
@@ -33,6 +33,40 @@
     </doc>
   </domain>
 
+  <domain name="delivery-properties-010">
+    <struct size="long" pack="short" type="1025">
+      <field name="discard-unroutable" domain="bit" label="controls discard of unroutable messages"/>
+      <field name="immediate" domain="bit" label="Consider message unroutable if it cannot be
+        processed immediately"/>
+      <field name="redelivered" domain="bit" label="redelivery flag"/>
+      <field name="priority" domain="octet" label="message priority, 0 to 9"
+        required="true"/>
+      <field name="delivery-mode" domain="octet" label="message persistence requirement"
+        required="true"/>
+      <field name="ttl" domain="longlong" label="time to live in ms"/>
+      <field name="timestamp" domain="longlong" label="message timestamp"/>
+      <field name="expiration" domain="longlong" label="message expiration time"/>
+      <field name="exchange" domain="shortstr" label="originating exchange"/>
+      <field name="routing-key" domain="shortstr" label="message routing key"/>
+      <field name="resume-id" domain="mediumstr" label="global id for message transfer"/>
+      <field name="resume-ttl" domain="longlong" label="ttl in ms for interrupted message data"/>
+    </struct>
+  </domain>
+
+  <domain name="message-properties-010">
+    <struct size="long" pack="short" type="1027">
+      <field name="content-length" domain="longlong" label="length of the body segment in bytes"/>
+      <field name="message-id" domain="uuid" label="application message identifier"/>
+      <field name="correlation-id" domain="mediumstr" label="application correlation identifier"/>
+      <field name="reply-to" domain="reply-to" label="destination to reply to"/>
+      <field name="content-type" domain="shortstr" label="MIME content type"/>
+      <field name="content-encoding" domain="shortstr" label="MIME content encoding"/>
+      <field name="user-id" domain="mediumstr" label="creating user id"/>
+      <field name="app-id" domain="mediumstr" label="creating application id"/>
+      <field name="application-headers" domain="table" label="application specific headers table"/>
+    </struct>
+  </domain>
+
 <class name = "connection010" index = "1">
 
 <method name = "start" index="1">

Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Thu Mar  6 03:44:36 2008
@@ -2,9 +2,6 @@
 tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair
 tests.codec.FieldTableTestCase.test_field_table_name_value_pair
 tests_0-10.query.QueryTests.test_exchange_bound_header
-tests_0-10.persistence.PersistenceTests.test_ack_message_from_deleted_queue
-tests_0-10.persistence.PersistenceTests.test_delete_queue_after_publish
-tests_0-10.persistence.PersistenceTests.test_queue_deletion
 tests_0-10.tx.TxTests.test_auto_rollback
 tests_0-10.tx.TxTests.test_commit
 tests_0-10.tx.TxTests.test_rollback
@@ -92,6 +89,4 @@
 tests_0-10.broker.BrokerTests.test_closed_channel
 tests_0-10.broker.BrokerTests.test_ack_and_no_ack
 tests_0-10.broker.BrokerTests.test_invalid_channel
-tests_0-10.broker.BrokerTests.test_simple_delivery_queued
-tests_0-10.broker.BrokerTests.test_simple_delivery_immediate
 tests_0-10.example.ExampleTest.test_example

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/broker.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/broker.py?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/broker.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/broker.py Thu Mar  6 03:44:36 2008
@@ -70,7 +70,7 @@
         body = "Immediate Delivery"
         session.message_transfer("amq.fanout", None, None, Message(body))
         msg = queue.get(timeout=5)
-        self.assert_(msg.content.body == body)
+        self.assert_(msg.body == body)
 
     def test_simple_delivery_queued(self):
         """
@@ -89,7 +89,7 @@
         session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag)
         queue = session.incoming(consumer_tag)
         msg = queue.get(timeout=5)
-        self.assert_(msg.content.body == body)
+        self.assert_(msg.body == body)
 
     def test_invalid_channel(self):
         channel = self.client.channel(200)