You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/11/19 12:46:08 UTC

svn commit: r882118 - in /qpid/trunk/qpid/cpp: examples/messaging/ include/qpid/messaging/ src/qpid/messaging/

Author: gsim
Date: Thu Nov 19 11:46:00 2009
New Revision: 882118

URL: http://svn.apache.org/viewvc?rev=882118&view=rev
Log:
QPID-664: Add spout & drain examples as per python client

Added:
    qpid/trunk/qpid/cpp/examples/messaging/drain.cpp
    qpid/trunk/qpid/cpp/examples/messaging/spout.cpp
Modified:
    qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt
    qpid/trunk/qpid/cpp/examples/messaging/Makefile.am
    qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/MapContent.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp

Modified: qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt?rev=882118&r1=882117&r2=882118&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt Thu Nov 19 11:46:00 2009
@@ -17,6 +17,9 @@
 # under the License.
 #
 
+add_example(messaging drain)
+add_example(messaging spout)
+
 add_example(messaging queue_receiver)
 add_example(messaging queue_sender)
 

Modified: qpid/trunk/qpid/cpp/examples/messaging/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/Makefile.am?rev=882118&r1=882117&r2=882118&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/Makefile.am Thu Nov 19 11:46:00 2009
@@ -21,7 +21,13 @@
 MAKELDFLAGS=$(CLIENTFLAGS)
 include $(top_srcdir)/examples/makedist.mk
 
-noinst_PROGRAMS=queue_sender queue_receiver topic_sender topic_receiver client server map_sender map_receiver
+noinst_PROGRAMS=drain spout queue_sender queue_receiver topic_sender topic_receiver client server map_sender map_receiver
+
+drain_SOURCES=drain.cpp
+drain_LDADD=$(CLIENT_LIB)
+
+spout_SOURCES=spout.cpp
+spout_LDADD=$(CLIENT_LIB)
 
 queue_sender_SOURCES=queue_sender.cpp
 queue_sender_LDADD=$(CLIENT_LIB)

Added: qpid/trunk/qpid/cpp/examples/messaging/drain.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/drain.cpp?rev=882118&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/drain.cpp (added)
+++ qpid/trunk/qpid/cpp/examples/messaging/drain.cpp Thu Nov 19 11:46:00 2009
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/MapView.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/Exception.h>
+#include <qpid/Options.h>
+#include <qpid/log/Logger.h>
+#include <qpid/log/Options.h>
+#include <qpid/sys/Time.h>
+
+#include <iostream>
+
+using namespace qpid::messaging;
+using qpid::sys::Duration;
+using qpid::sys::TIME_INFINITE;
+using qpid::sys::TIME_SEC;
+
+struct Options : public qpid::Options
+{
+    bool help;
+    std::string url;
+    std::string address;
+    int64_t timeout;
+    bool forever;
+    qpid::log::Options log;
+
+    Options(const std::string& argv0=std::string())
+        : qpid::Options("Options"),
+          help(false),
+          url("amqp:tcp:127.0.0.1"),
+          timeout(0),
+          forever(false),
+          log(argv0)
+    {
+        addOptions()
+            ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
+            ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
+            ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting")
+            ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
+            ("help", qpid::optValue(help), "print this usage statement");
+        add(log);
+    }
+
+    Duration getTimeout()
+    {
+        if (forever) return TIME_INFINITE;
+        else return timeout*TIME_SEC;
+
+    }
+    bool parse(int argc, char** argv)
+    {
+        try {
+            qpid::Options::parse(argc, argv);
+            if (address.empty()) throw qpid::Exception("Address must be specified!");
+            qpid::log::Logger::instance().configure(log);
+            if (help) {
+                std::ostringstream msg;
+                std::cout << msg << *this << std::endl << std::endl 
+                          << "Drains messages from the specified address" << std::endl;
+                return false;
+            } else {
+                return true;
+            }
+        } catch (const std::exception& e) {
+            std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
+            return false;
+        }
+    }
+};
+
+
+int main(int argc, char** argv)
+{
+    Options options(argv[0]);
+    if (options.parse(argc, argv)) {
+        try {
+            Connection connection = Connection::open(options.url);
+            Session session = connection.newSession();
+            Receiver receiver = session.createReceiver(options.address);
+            Duration timeout = options.getTimeout();
+            Message message;
+            while (receiver.fetch(message, timeout)) {
+                std::cout << "Message(properties=" << message.getHeaders() << ", content='" ;
+                if (message.getContentType() == "amqp/map") {
+                    std::cout << MapView(message);
+                } else {
+                    std::cout << message.getContent();
+                }
+                std::cout  << "')" << std::endl;
+                session.acknowledge();
+            }
+            receiver.cancel();
+            session.close();            
+            connection.close();
+            return 0;
+        } catch(const std::exception& error) {
+            std::cout << error.what() << std::endl;
+        }
+    }
+    return 1;   
+}
+
+

Added: qpid/trunk/qpid/cpp/examples/messaging/spout.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/spout.cpp?rev=882118&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/spout.cpp (added)
+++ qpid/trunk/qpid/cpp/examples/messaging/spout.cpp Thu Nov 19 11:46:00 2009
@@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/MapContent.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/messaging/Variant.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/Exception.h>
+#include <qpid/Options.h>
+#include <qpid/log/Logger.h>
+#include <qpid/log/Options.h>
+#include <qpid/sys/Time.h>
+
+#include <iostream>
+#include <vector>
+
+#include <boost/format.hpp>
+
+using namespace qpid::messaging;
+using qpid::framing::Uuid;
+using qpid::sys::AbsTime;
+using qpid::sys::now;
+using qpid::sys::TIME_INFINITE;
+
+typedef std::vector<std::string> string_vector;
+
+struct Options : public qpid::Options
+{
+    bool help;
+    std::string url;
+    std::string address;
+    int64_t timeout;
+    uint count;
+    std::string id;
+    std::string replyto;
+    string_vector properties;
+    string_vector entries;
+    std::string content;
+    qpid::log::Options log;
+
+    Options(const std::string& argv0=std::string())
+        : qpid::Options("Options"),
+          help(false),
+          url("amqp:tcp:127.0.0.1"),
+          timeout(TIME_INFINITE),
+          count(1),
+          log(argv0)
+    {
+        addOptions()
+            ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
+            ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
+            ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "exit after the specified time")
+            ("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables")
+            ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
+            ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
+            ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
+            ("entry,E", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content")
+            ("content", qpid::optValue(content, "CONTENT"), "specify textual content")
+            ("help", qpid::optValue(help), "print this usage statement");
+        add(log);
+    }
+
+    bool parse(int argc, char** argv)
+    {
+        try {
+            qpid::Options::parse(argc, argv);
+            if (address.empty()) throw qpid::Exception("Address must be specified!");
+            qpid::log::Logger::instance().configure(log);
+            if (help) {
+                std::ostringstream msg;
+                std::cout << msg << *this << std::endl << std::endl 
+                          << "Drains messages from the specified address" << std::endl;
+                return false;
+            } else {
+                return true;
+            }
+        } catch (const std::exception& e) {
+            std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
+            return false;
+        }
+    }
+
+    static bool nameval(const std::string& in, std::string& name, std::string& value)
+    {
+        std::string::size_type i = in.find("=");
+        if (i == std::string::npos) {
+            name = in;
+            return false;
+        } else {
+            name = in.substr(0, i);
+            if (i+1 < in.size()) {
+                value = in.substr(i+1);
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    static void setProperty(Message& message, const std::string& property)
+    {
+        std::string name;
+        std::string value;
+        if (nameval(property, name, value)) {
+            message.getHeaders()[name] = value;
+        } else {
+            message.getHeaders()[name] = Variant();
+        }    
+    }
+
+    void setProperties(Message& message) const
+    {
+        for (string_vector::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+            setProperty(message, *i);
+        }
+    }
+
+    void setEntries(MapContent& content) const
+    {
+        for (string_vector::const_iterator i = entries.begin(); i != entries.end(); ++i) {
+            std::string name;
+            std::string value;
+            if (nameval(*i, name, value)) {
+                content[name] = value;
+            } else {
+                content[name] = Variant();
+            }
+        }
+    }
+};
+
+
+int main(int argc, char** argv)
+{
+    Options options(argv[0]);
+    if (options.parse(argc, argv)) {        
+        try {
+            Connection connection = Connection::open(options.url);
+            Session session = connection.newSession();
+            Sender sender = session.createSender(options.address);
+
+            Message message;
+            options.setProperties(message);
+            if (options.entries.size()) {
+                MapContent content(message);
+                options.setEntries(content);
+                content.encode();
+            } else if (options.content.size()) {
+                message.setContent(options.content);
+                message.setContentType("text/plain; charset=utf8");
+            }
+            AbsTime end(now(), options.timeout);
+            for (uint count = 0; (count < options.count || options.count == 0) && end > now(); count++) {
+                if (!options.replyto.empty()) message.setReplyTo(Address(options.replyto));
+                std::string id = options.id.empty() ? Uuid(true).str() : options.id;
+                message.getHeaders()["spout-id"] = (boost::format("%1%:%2%") % id % count).str();
+                sender.send(message);
+            }
+            connection.close();
+            return 0;
+        } catch(const std::exception& error) {
+            std::cout << error.what() << std::endl;
+        }
+    }
+    return 1;
+}
+
+

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h?rev=882118&r1=882117&r2=882118&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h Thu Nov 19 11:46:00 2009
@@ -58,8 +58,8 @@
     QPID_CLIENT_EXTERN void setContentType(const std::string&);
     QPID_CLIENT_EXTERN const std::string& getContentType() const;
 
-    QPID_CLIENT_EXTERN const VariantMap& getHeaders() const;
-    QPID_CLIENT_EXTERN VariantMap& getHeaders();
+    QPID_CLIENT_EXTERN const Variant::Map& getHeaders() const;
+    QPID_CLIENT_EXTERN Variant::Map& getHeaders();
 
     QPID_CLIENT_EXTERN const std::string& getContent() const;
     QPID_CLIENT_EXTERN std::string& getContent();

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp?rev=882118&r1=882117&r2=882118&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp Thu Nov 19 11:46:00 2009
@@ -95,7 +95,7 @@
     std::stringstream out;
     out << impl->name;
     if (!impl->subject.empty()) out << SUBJECT_DIVIDER << impl->subject;
-    if (!impl->options.empty()) out << OPTIONS_DIVIDER << " {" << impl->options << "}";
+    if (!impl->options.empty()) out << OPTIONS_DIVIDER << impl->options;
     return out.str();
 }
 Address::operator bool() const { return !impl->name.empty(); }

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/MapContent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/MapContent.cpp?rev=882118&r1=882117&r2=882118&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/MapContent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/MapContent.cpp Thu Nov 19 11:46:00 2009
@@ -41,6 +41,7 @@
     {
         qpid::client::amqp0_10::MapCodec codec;
         codec.encode(*this, msg->getContent());
+        msg->setContentType(qpid::client::amqp0_10::MapCodec::contentType);
     }
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp?rev=882118&r1=882117&r2=882118&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp Thu Nov 19 11:46:00 2009
@@ -566,19 +566,23 @@
 
 std::ostream& operator<<(std::ostream& out, const Variant::Map& map)
 {
+    out << "{";
     for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
         if (i != map.begin()) out << ", ";
         out << i->first << ":" << i->second;
     }
+    out << "}";
     return out;
 }
 
 std::ostream& operator<<(std::ostream& out, const Variant::List& list)
 {
+    out << "[";
     for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
         if (i != list.begin()) out << ", ";
         out << *i;
     }
+    out << "]";
     return out;
 }
 
@@ -586,10 +590,10 @@
 {
     switch (value.getType()) {
       case VAR_MAP:
-        out << "{" << value.asMap() << "}";
+        out << value.asMap();
         break;
       case VAR_LIST:
-        out << "[" << value.asList() << "]";
+        out << value.asList();
         break;
       case VAR_VOID:
         out << "<void>";



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org