You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/11 13:25:28 UTC

svn commit: r574551 - in /incubator/qpid/trunk/qpid/cpp: rubygen/templates/ src/qpid/client/ src/qpid/framing/ src/tests/

Author: gsim
Date: Tue Sep 11 04:25:27 2007
New Revision: 574551

URL: http://svn.apache.org/viewvc?rev=574551&view=rev
Log:
Moved old ClientChannel class from using basic to using message for publish & consume.
(Get and qos still use the basic class's defintions, that will be changed next)


Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
    incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb?rev=574551&r1=574550&r2=574551&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb Tue Sep 11 04:25:27 2007
@@ -152,6 +152,9 @@
   def define_accessors(f)
     genl "void set#{f.name.caps}(#{f.cpptype.param} _#{f.cppname}) { #{f.cppname} = _#{f.cppname}; }"
     genl "#{f.cpptype.ret} get#{f.name.caps}() const { return #{f.cppname}; }"
+    if (f.cpptype.name == "FieldTable")
+      genl "#{f.cpptype.name}& get#{f.name.caps}() { return #{f.cppname}; }"
+    end
   end
 
   def define_struct(s)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?rev=574551&r1=574550&r2=574551&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Tue Sep 11 04:25:27 2007
@@ -136,7 +136,7 @@
 }
 
 void Channel::consume(
-    Queue& queue, const std::string& tag, MessageListener* listener, 
+    Queue& _queue, const std::string& tag, MessageListener* listener, 
     AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
 
     if (tag.empty()) {
@@ -152,10 +152,11 @@
         c.ackMode = ackMode;
         c.lastDeliveryTag = 0;
     }
+    uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1;
     ScopedSync s(session, synch);
-    session.basicConsume(0, queue.getName(), tag, noLocal,
-                          ackMode == NO_ACK, false, !synch,
-                          fields ? *fields : FieldTable());
+    session.messageSubscribe(0, _queue.getName(), tag, noLocal, 
+                             confirmMode, 0/*pre-acquire*/, 
+                             false, fields ? *fields : FieldTable());
 }
         
 void Channel::cancel(const std::string& tag, bool synch) {
@@ -169,7 +170,7 @@
         consumers.erase(i);
     }
     ScopedSync s(session, synch);
-    session.basicCancel(tag);
+    session.messageCancel(tag);
 }
 
 bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
@@ -184,14 +185,13 @@
     }
 }
 
-void Channel::publish(const Message& msg, const Exchange& exchange,
+void Channel::publish(Message& msg, const Exchange& exchange,
                       const std::string& routingKey, 
-                      bool mandatory, bool immediate) {
+                      bool mandatory, bool /*immediate TODO-restore immediate?*/) {
 
-    const string e = exchange.getName();
-    string key = routingKey;
-
-    session.basicPublish(0, e, key, mandatory, immediate, msg);
+    msg.getDeliveryProperties().setRoutingKey(routingKey);
+    msg.getDeliveryProperties().setDiscardUnroutable(!mandatory);
+    session.messageTransfer((destination=exchange.getName(), content=msg));
 }
 
 void Channel::close()
@@ -222,20 +222,27 @@
     }
 }
 
+void Channel::dispatch(FrameSet& content, const std::string& destination)
+{
+    ConsumerMap::iterator i = consumers.find(destination);
+    if (i != consumers.end()) {
+        Message msg;
+        msg.populate(content);
+        i->second.listener->received(msg);
+    } else {
+        QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);                        
+    }               
+}
+
 void Channel::run() {
     try {
         while (true) {
             FrameSet::shared_ptr content = session.get();
             //need to dispatch this to the relevant listener:
             if (content->isA<BasicDeliverBody>()) {
-                ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
-                if (i != consumers.end()) {
-                    Message msg;
-                    msg.populate(*content);
-                    i->second.listener->received(msg);
-                } else {
-                    QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod());                        
-                }               
+                dispatch(*content, content->as<BasicDeliverBody>()->getConsumerTag());
+            } else if (content->isA<MessageTransferBody>()) {
+                dispatch(*content, content->as<MessageTransferBody>()->getDestination());
             } else if (content->isA<BasicGetOkBody>()) {
                 gets.push(content);
             } else {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?rev=574551&r1=574550&r2=574551&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Tue Sep 11 04:25:27 2007
@@ -93,6 +93,8 @@
     void closeInternal();
     void join();
 
+    void dispatch(framing::FrameSet& msg, const std::string& destination);
+
     // FIXME aconway 2007-02-23: Get rid of friendships.
     friend class Connection;
 
@@ -301,7 +303,7 @@
      * receive this message on publication, the message will be
      * returned (see setReturnedMessageHandler()).
      */
-    void publish(const Message& msg, const Exchange& exchange,
+    void publish(Message& msg, const Exchange& exchange,
                  const std::string& routingKey, 
                  bool mandatory = false, bool immediate = false);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h?rev=574551&r1=574550&r2=574551&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h Tue Sep 11 04:25:27 2007
@@ -22,13 +22,7 @@
  *
  */
 #include <string>
-#include "qpid/framing/BasicHeaderProperties.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/MethodContent.h"
-
-#include "qpid/framing/BasicDeliverBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/TransferContent.h"
 
 namespace qpid {
 namespace client {
@@ -39,49 +33,37 @@
  *
  * \ingroup clientapi
  */
-// FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not
-// basic header properties.
-class Message : public framing::BasicHeaderProperties, public framing::MethodContent {
-  public:
-    Message(const std::string& data_=std::string()) : data(data_) {}
-
-    const std::string& getData() const { return data; }
-    void setData(const std::string& _data) { data = _data; }
-
-    std::string getDestination() const { return destination; }
-    void setDestination(const std::string& dest) { destination = dest; }
-
-    // TODO aconway 2007-03-22: only needed for Basic.deliver support.
-    uint64_t getDeliveryTag() const { return deliveryTag; }
-    void setDeliveryTag(uint64_t dt) { deliveryTag = dt; }
+class Message : public framing::TransferContent 
+{
+public:
+    Message(const std::string& data_=std::string()) : TransferContent(data_) {}
 
-    bool isRedelivered() const { return redelivered; }
-    void setRedelivered(bool _redelivered){  redelivered = _redelivered; }
+    std::string getDestination() const 
+    { 
+        return destination; 
+    }
+    
+    void setDestination(const std::string& dest) 
+    { 
+        destination = dest; 
+    }
 
-    framing::AMQHeaderBody getHeader() const
+    bool isRedelivered() const 
     { 
-        framing::AMQHeaderBody header;
-        BasicHeaderProperties* properties = header.get<BasicHeaderProperties>(true);
-        BasicHeaderProperties::copy<BasicHeaderProperties, Message>(*properties, *this);
-        properties->setContentLength(data.size());
-        return header;
+        return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); 
     }
 
-    //TODO: move this elsewhere (GRS 24/08/2007)
-    void populate(framing::FrameSet& frameset)
-    {
-        const BasicHeaderProperties* properties = frameset.getHeaders()->get<BasicHeaderProperties>();
-        if (properties) {
-            BasicHeaderProperties::copy<Message, BasicHeaderProperties>(*this, *properties);
-        }
-        frameset.getContent(data);
+    void setRedelivered(bool redelivered) { 
+        getDeliveryProperties().setRedelivered(redelivered); 
+    }
+
+    framing::FieldTable& getHeaders() 
+    { 
+        return getMessageProperties().getApplicationHeaders(); 
     }
 
-  private:
-    std::string data;
+private:
     std::string destination;
-    bool redelivered;
-    uint64_t deliveryTag;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp?rev=574551&r1=574550&r2=574551&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp Tue Sep 11 04:25:27 2007
@@ -61,4 +61,35 @@
     return *header.get<DeliveryProperties>(true);
 }
 
+void TransferContent::populate(const FrameSet& frameset)
+{
+    header = *frameset.getHeaders();
+    frameset.getContent(data);
+}
+
+const MessageProperties& TransferContent::getMessageProperties() const
+{
+    const MessageProperties* props = header.get<MessageProperties>();
+    if (!props) throw NoSuchPropertiesException();
+    return *props;
+}
+
+const DeliveryProperties& TransferContent::getDeliveryProperties() const
+{
+    const DeliveryProperties* props = header.get<DeliveryProperties>();
+    if (!props) throw NoSuchPropertiesException();
+    return *props;
+}
+
+bool TransferContent::hasMessageProperties() const
+{
+    return header.get<MessageProperties>();
+}
+
+bool TransferContent::hasDeliveryProperties() const
+{
+    return header.get<DeliveryProperties>();
+}
+
+
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h?rev=574551&r1=574550&r2=574551&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h Tue Sep 11 04:25:27 2007
@@ -21,13 +21,17 @@
 #ifndef _TransferContent_
 #define _TransferContent_
 
+#include "FrameSet.h"
 #include "MethodContent.h"
+#include "qpid/Exception.h"
 #include "qpid/framing/MessageProperties.h"
 #include "qpid/framing/DeliveryProperties.h"
 
 namespace qpid {
 namespace framing {
 
+struct NoSuchPropertiesException : public Exception {};
+
 class TransferContent : public MethodContent
 {
     AMQHeaderBody header;
@@ -37,9 +41,16 @@
     AMQHeaderBody getHeader() const;
     void setData(const std::string&);
     void appendData(const std::string&);
-    const std::string& getData() const;
     MessageProperties& getMessageProperties();
     DeliveryProperties& getDeliveryProperties();
+
+    const std::string& getData() const;
+    const MessageProperties& getMessageProperties() const;
+    const DeliveryProperties& getDeliveryProperties() const;
+    bool hasMessageProperties() const;
+    bool hasDeliveryProperties() const;
+
+    void populate(const FrameSet& frameset);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp?rev=574551&r1=574550&r2=574551&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp Tue Sep 11 04:25:27 2007
@@ -47,6 +47,7 @@
 using qpid::TestCase;
 using qpid::TestOptions;
 using qpid::framing::FieldTable;
+using qpid::framing::ReplyTo;
 using namespace std;
 
 class DummyRun : public TestCase
@@ -73,14 +74,14 @@
     const string topic;
     TestMap::iterator test;
     auto_ptr<Thread> runner;
-    string reportTo;
+    ReplyTo reportTo;
     string reportCorrelator;    
 
     void shutdown();
     bool invite(const string& name);
     void run();
 
-    void sendResponse(Message& response, string replyTo);
+    void sendResponse(Message& response, ReplyTo replyTo);
     void sendResponse(Message& response, Message& request);
     void sendSimpleResponse(const string& type, Message& request);
     void sendReport();
@@ -146,30 +147,19 @@
     response.getHeaders().setString("CONTROL_TYPE", type);
     response.getHeaders().setString("CLIENT_NAME", name);
     response.getHeaders().setString("CLIENT_PRIVATE_CONTROL_KEY", topic);
-    response.setCorrelationId(request.getCorrelationId());
+    response.getMessageProperties().setCorrelationId(request.getMessageProperties().getCorrelationId());
     sendResponse(response, request);
 }
 
 void Listener::sendResponse(Message& response, Message& request)
 {
-    sendResponse(response, request.getReplyTo()); 
+    sendResponse(response, request.getMessageProperties().getReplyTo()); 
 }
 
-void Listener::sendResponse(Message& response, string replyTo)
+void Listener::sendResponse(Message& response, ReplyTo replyTo)
 {
-    //Exchange and routing key need to be extracted from the reply-to
-    //field. Format is assumed to be:
-    //
-    //    <exchange type>://<exchange name>/<routing key>?<options>
-    //
-    //and all we need is the exchange name and routing key
-    // 
-    if (replyTo.empty()) throw qpid::Exception("Reply address not set!"); 
-    const string delims(":/?=");
-
-    string::size_type start = replyTo.find(':');//skip exchange type
-    string exchange = parse_next_word(replyTo, delims, start);
-    string routingKey = parse_next_word(replyTo, delims, start);
+    string exchange = replyTo.getExchangeName();
+    string routingKey = replyTo.getRoutingKey();
     channel.publish(response, exchange, routingKey);
 }
 
@@ -188,12 +178,12 @@
         test->assign(message.getHeaders().getString("ROLE"), message.getHeaders(), options);
         sendSimpleResponse("ACCEPT_ROLE", message);
     } else if (type == "START") {        
-        reportTo = message.getReplyTo();
-        reportCorrelator = message.getCorrelationId();
+        reportTo = message.getMessageProperties().getReplyTo();
+        reportCorrelator = message.getMessageProperties().getCorrelationId();
         runner = auto_ptr<Thread>(new Thread(this));
     } else if (type == "STATUS_REQUEST") {
-        reportTo = message.getReplyTo();
-        reportCorrelator = message.getCorrelationId();
+        reportTo = message.getMessageProperties().getReplyTo();
+        reportCorrelator = message.getMessageProperties().getCorrelationId();
         test->stop();
         sendReport();
     } else if (type == "TERMINATE") {
@@ -229,7 +219,7 @@
     Message report;
     report.getHeaders().setString("CONTROL_TYPE", "REPORT");
     test->report(report);
-    report.setCorrelationId(reportCorrelator);
+    report.getMessageProperties().setCorrelationId(reportCorrelator);
     sendResponse(report, reportTo);
 }