You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2007/02/09 03:16:04 UTC

svn commit: r505139 - in /incubator/qpid/branches/qpid.0-9: ./ cpp/lib/broker/ python/tests/

Author: astitcher
Date: Thu Feb  8 18:16:03 2007
New Revision: 505139

URL: http://svn.apache.org/viewvc?view=rev&rev=505139
Log:
 r1104@fuschia:  andrew | 2007-02-09 02:14:00 +0000
 Initial implementation of Message.get delivery

Modified:
    incubator/qpid/branches/qpid.0-9/   (props changed)
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
    incubator/qpid/branches/qpid.0-9/python/tests/message.py

Propchange: incubator/qpid/branches/qpid.0-9/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Thu Feb  8 18:16:03 2007
@@ -1 +1 @@
-8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1102
+8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1104

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Thu Feb  8 18:16:03 2007
@@ -289,7 +289,7 @@
         
 void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
     Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());    
-    if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
+    if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){
         string clusterId;//not used, part of an imatix hack
 
         connection.client->getBasic().getEmpty(context, clusterId);

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Thu Feb  8 18:16:03 2007
@@ -204,8 +204,6 @@
     }
 }
 
-// FIXME aconway 2007-02-05: Drop exchange member, calculate from
-// message in ::complete().
 void Channel::handlePublish(Message* _message){
     Message::shared_ptr message(_message);
     messageBuilder.initialise(message);
@@ -292,12 +290,13 @@
     }
 }
 
-bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
+bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){
     Message::shared_ptr msg = queue->dequeue();
     if(msg){
         Mutex::ScopedLock locker(deliveryLock);
-        u_int64_t myDeliveryTag = currentDeliveryTag++;
+        u_int64_t myDeliveryTag = getNextSendRequestId();
         msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
+        			   destination,
                        queue->getMessageCount() + 1, myDeliveryTag,
                        framesize);
         if(ackExpected){

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Thu Feb  8 18:16:03 2007
@@ -125,7 +125,7 @@
                  bool exclusive, ConnectionToken* const connection = 0,
                  const framing::FieldTable* = 0);
     void cancel(const string& tag);
-    bool get(Queue::shared_ptr queue, bool ackExpected);
+    bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected);
     void begin();
     void close();
     void commit();

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp Thu Feb  8 18:16:03 2007
@@ -89,7 +89,8 @@
     sendContent(channel, framesize);
 }
 
-void BasicMessage::sendGetOk(const MethodContext& context, 
+void BasicMessage::sendGetOk(const MethodContext& context,
+    					     const std::string& /*destination*/,
                              u_int32_t messageCount,
                              u_int64_t deliveryTag, 
                              u_int32_t framesize)

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h Thu Feb  8 18:16:03 2007
@@ -78,6 +78,7 @@
                  u_int32_t framesize);
     
     void sendGetOk(const framing::MethodContext&, 
+				   const std::string& destination,
                    u_int32_t messageCount,
                    u_int64_t deliveryTag, 
                    u_int32_t framesize);

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h Thu Feb  8 18:16:03 2007
@@ -112,6 +112,7 @@
      * Used to return a message in response to a get from a queue
      */
     virtual void sendGetOk(const framing::MethodContext& context,
+    					   const std::string& destination,
                            u_int32_t messageCount,
                            u_int64_t deliveryTag, 
                            u_int32_t framesize) = 0;

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp Thu Feb  8 18:16:03 2007
@@ -79,12 +79,38 @@
 }
 
 void MessageMessage::sendGetOk(
-    const framing::MethodContext& /*context*/, 
+    const framing::MethodContext& context,
+	const std::string& destination,
     u_int32_t /*messageCount*/,
     u_int64_t /*deliveryTag*/, 
     u_int32_t /*framesize*/)
 {
-    // FIXME aconway 2007-02-05: 
+	framing::ChannelAdapter* channel = context.channel;
+    channel->send(
+    	new MessageTransferBody(channel->getVersion(), 
+                                transfer->getTicket(),
+                                destination,
+                                getRedelivered(),
+                                transfer->getImmediate(),
+                                transfer->getTtl(),
+                                transfer->getPriority(),
+                                transfer->getTimestamp(),
+                                transfer->getDeliveryMode(),
+                                transfer->getExpiration(),
+                                getExchange(),
+                                getRoutingKey(),
+                                transfer->getMessageId(),
+                                transfer->getCorrelationId(),
+                                transfer->getReplyTo(),
+                                transfer->getContentType(),
+                                transfer->getContentEncoding(),
+                                transfer->getUserId(),
+                                transfer->getAppId(),
+                                transfer->getTransactionId(),
+                                transfer->getSecurityToken(),
+                                transfer->getApplicationHeaders(),
+                                transfer->getBody(),
+                                transfer->getMandatory()));
 }
 
 bool MessageMessage::isComplete()

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Thu Feb  8 18:16:03 2007
@@ -60,6 +60,7 @@
                  u_int32_t framesize);
     
     void sendGetOk(const framing::MethodContext& context, 
+				   const std::string& destination,
                    u_int32_t messageCount,
                    u_int64_t deliveryTag, 
                    u_int32_t framesize);

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Thu Feb  8 18:16:03 2007
@@ -110,13 +110,13 @@
 MessageHandlerImpl::get( const MethodContext& context,
                          u_int16_t /*ticket*/,
                          const string& queueName,
-                         const string& /*destination*/,
+                         const string& destination,
                          bool noAck )
 {
     Queue::shared_ptr queue =
         connection.getQueue(queueName, context.channel->getId());
     
-    if(channel.get(queue, !noAck))
+    if(channel.get(queue, destination, !noAck))
         client.ok(context);
     else 
         client.empty(context);

Modified: incubator/qpid/branches/qpid.0-9/python/tests/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/tests/message.py?view=diff&rev=505139&r1=505138&r2=505139
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/tests/message.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/tests/message.py Thu Feb  8 18:16:03 2007
@@ -372,7 +372,7 @@
 
         reply = channel.message_get(no_ack=True, queue="test-get")
         self.assertEqual(reply.method.klass.name, "message")
-        self.assertEqual(reply.method.name, "get-empty")
+        self.assertEqual(reply.method.name, "empty")
 
         #repeat for no_ack=False
         for i in range(11, 21):
@@ -383,8 +383,11 @@
             reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
             self.assertEqual(reply.method.klass.name, "message")
             self.assertEqual(reply.method.name, "ok")
-            self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
-            reply.ok()
+            msg = self.client.queue(tag).get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.body)
+            # TODO: replace with below when we have batching
+            if(i in [11, 12, 13, 15, 17, 19]):
+              msg.ok()
 
             #todo: when batching is available, test ack multiple
             #if(i == 13):
@@ -394,7 +397,7 @@
 
         reply = channel.message_get(no_ack=True, queue="test-get")
         self.assertEqual(reply.method.klass.name, "message")
-        self.assertEqual(reply.method.name, "get-empty")
+        self.assertEqual(reply.method.name, "empty")
 
         #recover(requeue=True)
         channel.message_recover(requeue=True)
@@ -405,19 +408,20 @@
             reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
             self.assertEqual(reply.method.klass.name, "message")
             self.assertEqual(reply.method.name, "ok")
-            self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
-            reply.ok()
+            msg = self.client.queue(tag).get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.body)
+            msg.ok()
             #channel.message_ack(delivery_tag=reply.delivery_tag)
 
         reply = channel.message_get(no_ack=True, queue="test-get")
         self.assertEqual(reply.method.klass.name, "message")
-        self.assertEqual(reply.method.name, "get-empty")
+        self.assertEqual(reply.method.name, "empty")
 
         channel.message_recover(requeue=True)
 
         reply = channel.message_get(no_ack=True, queue="test-get")
         self.assertEqual(reply.method.klass.name, "message")
-        self.assertEqual(reply.method.name, "get-empty")
+        self.assertEqual(reply.method.name, "empty")
 
     def test_reference_simple(self):
         """