You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2007/02/09 03:16:04 UTC
svn commit: r505139 - in /incubator/qpid/branches/qpid.0-9: ./
cpp/lib/broker/ python/tests/
Author: astitcher
Date: Thu Feb 8 18:16:03 2007
New Revision: 505139
URL: http://svn.apache.org/viewvc?view=rev&rev=505139
Log:
r1104@fuschia: andrew | 2007-02-09 02:14:00 +0000
Initial implementation of Message.get delivery
Modified:
incubator/qpid/branches/qpid.0-9/ (props changed)
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
incubator/qpid/branches/qpid.0-9/python/tests/message.py
Propchange: incubator/qpid/branches/qpid.0-9/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Thu Feb 8 18:16:03 2007
@@ -1 +1 @@
-8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1102
+8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1104
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Thu Feb 8 18:16:03 2007
@@ -289,7 +289,7 @@
void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
+ if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){
string clusterId;//not used, part of an imatix hack
connection.client->getBasic().getEmpty(context, clusterId);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Thu Feb 8 18:16:03 2007
@@ -204,8 +204,6 @@
}
}
-// FIXME aconway 2007-02-05: Drop exchange member, calculate from
-// message in ::complete().
void Channel::handlePublish(Message* _message){
Message::shared_ptr message(_message);
messageBuilder.initialise(message);
@@ -292,12 +290,13 @@
}
}
-bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
+bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){
Message::shared_ptr msg = queue->dequeue();
if(msg){
Mutex::ScopedLock locker(deliveryLock);
- u_int64_t myDeliveryTag = currentDeliveryTag++;
+ u_int64_t myDeliveryTag = getNextSendRequestId();
msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
+ destination,
queue->getMessageCount() + 1, myDeliveryTag,
framesize);
if(ackExpected){
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Thu Feb 8 18:16:03 2007
@@ -125,7 +125,7 @@
bool exclusive, ConnectionToken* const connection = 0,
const framing::FieldTable* = 0);
void cancel(const string& tag);
- bool get(Queue::shared_ptr queue, bool ackExpected);
+ bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected);
void begin();
void close();
void commit();
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp Thu Feb 8 18:16:03 2007
@@ -89,7 +89,8 @@
sendContent(channel, framesize);
}
-void BasicMessage::sendGetOk(const MethodContext& context,
+void BasicMessage::sendGetOk(const MethodContext& context,
+ const std::string& /*destination*/,
u_int32_t messageCount,
u_int64_t deliveryTag,
u_int32_t framesize)
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h Thu Feb 8 18:16:03 2007
@@ -78,6 +78,7 @@
u_int32_t framesize);
void sendGetOk(const framing::MethodContext&,
+ const std::string& destination,
u_int32_t messageCount,
u_int64_t deliveryTag,
u_int32_t framesize);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h Thu Feb 8 18:16:03 2007
@@ -112,6 +112,7 @@
* Used to return a message in response to a get from a queue
*/
virtual void sendGetOk(const framing::MethodContext& context,
+ const std::string& destination,
u_int32_t messageCount,
u_int64_t deliveryTag,
u_int32_t framesize) = 0;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp Thu Feb 8 18:16:03 2007
@@ -79,12 +79,38 @@
}
void MessageMessage::sendGetOk(
- const framing::MethodContext& /*context*/,
+ const framing::MethodContext& context,
+ const std::string& destination,
u_int32_t /*messageCount*/,
u_int64_t /*deliveryTag*/,
u_int32_t /*framesize*/)
{
- // FIXME aconway 2007-02-05:
+ framing::ChannelAdapter* channel = context.channel;
+ channel->send(
+ new MessageTransferBody(channel->getVersion(),
+ transfer->getTicket(),
+ destination,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ transfer->getBody(),
+ transfer->getMandatory()));
}
bool MessageMessage::isComplete()
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Thu Feb 8 18:16:03 2007
@@ -60,6 +60,7 @@
u_int32_t framesize);
void sendGetOk(const framing::MethodContext& context,
+ const std::string& destination,
u_int32_t messageCount,
u_int64_t deliveryTag,
u_int32_t framesize);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Thu Feb 8 18:16:03 2007
@@ -110,13 +110,13 @@
MessageHandlerImpl::get( const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
- const string& /*destination*/,
+ const string& destination,
bool noAck )
{
Queue::shared_ptr queue =
connection.getQueue(queueName, context.channel->getId());
- if(channel.get(queue, !noAck))
+ if(channel.get(queue, destination, !noAck))
client.ok(context);
else
client.empty(context);
Modified: incubator/qpid/branches/qpid.0-9/python/tests/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/tests/message.py?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/tests/message.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/tests/message.py Thu Feb 8 18:16:03 2007
@@ -372,7 +372,7 @@
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "get-empty")
+ self.assertEqual(reply.method.name, "empty")
#repeat for no_ack=False
for i in range(11, 21):
@@ -383,8 +383,11 @@
reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "ok")
- self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
- reply.ok()
+ msg = self.client.queue(tag).get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+ # TODO: replace with below when we have batching
+ if(i in [11, 12, 13, 15, 17, 19]):
+ msg.ok()
#todo: when batching is available, test ack multiple
#if(i == 13):
@@ -394,7 +397,7 @@
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "get-empty")
+ self.assertEqual(reply.method.name, "empty")
#recover(requeue=True)
channel.message_recover(requeue=True)
@@ -405,19 +408,20 @@
reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "ok")
- self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
- reply.ok()
+ msg = self.client.queue(tag).get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+ msg.ok()
#channel.message_ack(delivery_tag=reply.delivery_tag)
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "get-empty")
+ self.assertEqual(reply.method.name, "empty")
channel.message_recover(requeue=True)
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "get-empty")
+ self.assertEqual(reply.method.name, "empty")
def test_reference_simple(self):
"""