You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/03/06 12:44:39 UTC
svn commit: r634229 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/
cpp/src/qpid/framing/ cpp/xml/ python/ python/tests_0-10/
Author: gsim
Date: Thu Mar 6 03:44:36 2008
New Revision: 634229
URL: http://svn.apache.org/viewvc?rev=634229&view=rev
Log:
Fix message delivery for 0-10 final codepath
Convert two more python tests to use 0-10 client
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
incubator/qpid/trunk/qpid/cpp/xml/extra.xml
incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
incubator/qpid/trunk/qpid/python/tests_0-10/broker.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Thu Mar 6 03:44:36 2008
@@ -37,29 +37,30 @@
void MessageBuilder::handle(AMQFrame& frame)
{
+ uint8_t type = frame.getBody()->type();
switch(state) {
case METHOD:
- checkType(METHOD_BODY, frame.getBody()->type());
+ checkType(METHOD_BODY, type);
state = HEADER;
break;
case HEADER:
- switch (frame.getBody()->type()) {
- case CONTENT_BODY:
- //TODO: rethink how to handle non-existent headers...
+ if (type == CONTENT_BODY) {
+ //TODO: rethink how to handle non-existent headers(?)...
//didn't get a header: add in a dummy
- message->getFrames().append(AMQFrame(AMQHeaderBody()));
- break;
- case HEADER_BODY:
- break;
- default:
+ AMQFrame header;
+ header.setBody(AMQHeaderBody());
+ header.setBof(false);
+ header.setEof(false);
+ message->getFrames().append(header);
+ } else if (type != HEADER_BODY) {
throw CommandInvalidException(
QPID_MSG("Invalid frame sequence for message, expected header or content got "
- << type_str(frame.getBody()->type()) << ")"));
+ << type_str(type) << ")"));
}
state = CONTENT;
break;
case CONTENT:
- checkType(CONTENT_BODY, frame.getBody()->type());
+ checkType(CONTENT_BODY, type);
break;
default:
throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")"));
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp Thu Mar 6 03:44:36 2008
@@ -82,9 +82,10 @@
const std::string destination;
const u_int8_t confirmMode;
const u_int8_t acquireMode;
+ const bool isPreview;
- MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) :
- destination(d), confirmMode(c), acquireMode(a) {}
+ MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a, bool p) :
+ destination(d), confirmMode(c), acquireMode(a), isPreview(p) {}
AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId /*id*/)
{
@@ -92,9 +93,14 @@
if (msg->getRedelivered()){
msg->getProperties<DeliveryProperties>()->setRedelivered(true);
}
- return AMQFrame(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, destination,
- confirmMode, acquireMode));
+ if (isPreview) {
+ return AMQFrame(in_place<MessageTransferBody>(
+ ProtocolVersion(), 0, destination,
+ confirmMode, acquireMode));
+ } else {
+ return AMQFrame(in_place<Message010TransferBody>(
+ ProtocolVersion(), destination, confirmMode, acquireMode));
+ }
}
};
@@ -114,7 +120,13 @@
DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::string& destination,
u_int8_t confirmMode, u_int8_t acquireMode)
{
- return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode));
+ return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, false));
+}
+
+DeliveryToken::shared_ptr MessageDelivery::getPreviewMessageDeliveryToken(const std::string& destination,
+ u_int8_t confirmMode, u_int8_t acquireMode)
+{
+ return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, true));
}
void MessageDelivery::deliver(QueuedMessage& msg,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h Thu Mar 6 03:44:36 2008
@@ -40,6 +40,9 @@
public:
static boost::shared_ptr<DeliveryToken> getBasicGetToken(boost::shared_ptr<Queue> queue);
static boost::shared_ptr<DeliveryToken> getBasicConsumeToken(const std::string& consumer);
+ static boost::shared_ptr<DeliveryToken> getPreviewMessageDeliveryToken(const std::string& destination,
+ u_int8_t confirmMode,
+ u_int8_t acquireMode);
static boost::shared_ptr<DeliveryToken> getMessageDeliveryToken(const std::string& destination,
u_int8_t confirmMode,
u_int8_t acquireMode);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Thu Mar 6 03:44:36 2008
@@ -137,7 +137,7 @@
throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
string tag = destination;
- state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
+ state.consume(MessageDelivery::getPreviewMessageDeliveryToken(destination, confirmMode, acquireMode),
tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Mar 6 03:44:36 2008
@@ -32,6 +32,8 @@
#include "TxAck.h"
#include "TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/Message010TransferBody.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -344,8 +346,12 @@
}
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
- std::string exchangeName = msg->getExchangeName();
- msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
+ std::string exchangeName = msg->getExchangeName();
+ if (msg->isA<MessageTransferBody>()) {
+ msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
+ } else if (msg->isA<Message010TransferBody>()) {
+ msg->getProperties<DeliveryProperties010>()->setExchange(exchangeName);
+ }
if (!cacheExchange || cacheExchange->getName() != exchangeName){
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h Thu Mar 6 03:44:36 2008
@@ -26,6 +26,8 @@
#include "Buffer.h"
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/DeliveryProperties010.h"
+#include "qpid/framing/MessageProperties010.h"
#include <iostream>
#include <boost/optional.hpp>
@@ -75,8 +77,10 @@
};
// Could use boost::mpl::fold to construct a larger set.
- typedef PropSet<PropSet<Empty, DeliveryProperties>,
- MessageProperties> Properties;
+ typedef PropSet< PropSet< PropSet<PropSet<Empty, DeliveryProperties>,
+ MessageProperties>,
+ DeliveryProperties010>,
+ MessageProperties010> Properties;
Properties properties;
Modified: incubator/qpid/trunk/qpid/cpp/xml/extra.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/extra.xml?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/extra.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/extra.xml Thu Mar 6 03:44:36 2008
@@ -33,6 +33,40 @@
</doc>
</domain>
+ <domain name="delivery-properties-010">
+ <struct size="long" pack="short" type="1025">
+ <field name="discard-unroutable" domain="bit" label="controls discard of unroutable messages"/>
+ <field name="immediate" domain="bit" label="Consider message unroutable if it cannot be
+ processed immediately"/>
+ <field name="redelivered" domain="bit" label="redelivery flag"/>
+ <field name="priority" domain="octet" label="message priority, 0 to 9"
+ required="true"/>
+ <field name="delivery-mode" domain="octet" label="message persistence requirement"
+ required="true"/>
+ <field name="ttl" domain="longlong" label="time to live in ms"/>
+ <field name="timestamp" domain="longlong" label="message timestamp"/>
+ <field name="expiration" domain="longlong" label="message expiration time"/>
+ <field name="exchange" domain="shortstr" label="originating exchange"/>
+ <field name="routing-key" domain="shortstr" label="message routing key"/>
+ <field name="resume-id" domain="mediumstr" label="global id for message transfer"/>
+ <field name="resume-ttl" domain="longlong" label="ttl in ms for interrupted message data"/>
+ </struct>
+ </domain>
+
+ <domain name="message-properties-010">
+ <struct size="long" pack="short" type="1027">
+ <field name="content-length" domain="longlong" label="length of the body segment in bytes"/>
+ <field name="message-id" domain="uuid" label="application message identifier"/>
+ <field name="correlation-id" domain="mediumstr" label="application correlation identifier"/>
+ <field name="reply-to" domain="reply-to" label="destination to reply to"/>
+ <field name="content-type" domain="shortstr" label="MIME content type"/>
+ <field name="content-encoding" domain="shortstr" label="MIME content encoding"/>
+ <field name="user-id" domain="mediumstr" label="creating user id"/>
+ <field name="app-id" domain="mediumstr" label="creating application id"/>
+ <field name="application-headers" domain="table" label="application specific headers table"/>
+ </struct>
+ </domain>
+
<class name = "connection010" index = "1">
<method name = "start" index="1">
Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Thu Mar 6 03:44:36 2008
@@ -2,9 +2,6 @@
tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair
tests.codec.FieldTableTestCase.test_field_table_name_value_pair
tests_0-10.query.QueryTests.test_exchange_bound_header
-tests_0-10.persistence.PersistenceTests.test_ack_message_from_deleted_queue
-tests_0-10.persistence.PersistenceTests.test_delete_queue_after_publish
-tests_0-10.persistence.PersistenceTests.test_queue_deletion
tests_0-10.tx.TxTests.test_auto_rollback
tests_0-10.tx.TxTests.test_commit
tests_0-10.tx.TxTests.test_rollback
@@ -92,6 +89,4 @@
tests_0-10.broker.BrokerTests.test_closed_channel
tests_0-10.broker.BrokerTests.test_ack_and_no_ack
tests_0-10.broker.BrokerTests.test_invalid_channel
-tests_0-10.broker.BrokerTests.test_simple_delivery_queued
-tests_0-10.broker.BrokerTests.test_simple_delivery_immediate
tests_0-10.example.ExampleTest.test_example
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/broker.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/broker.py?rev=634229&r1=634228&r2=634229&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/broker.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/broker.py Thu Mar 6 03:44:36 2008
@@ -70,7 +70,7 @@
body = "Immediate Delivery"
session.message_transfer("amq.fanout", None, None, Message(body))
msg = queue.get(timeout=5)
- self.assert_(msg.content.body == body)
+ self.assert_(msg.body == body)
def test_simple_delivery_queued(self):
"""
@@ -89,7 +89,7 @@
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag)
queue = session.incoming(consumer_tag)
msg = queue.get(timeout=5)
- self.assert_(msg.content.body == body)
+ self.assert_(msg.body == body)
def test_invalid_channel(self):
channel = self.client.channel(200)