You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2010/01/12 13:02:38 UTC

svn commit: r898296 - in /qpid/trunk/qpid/cpp: examples/messaging/ include/qpid/messaging/ src/qpid/client/amqp0_10/ src/qpid/messaging/ src/tests/

Author: gsim
Date: Tue Jan 12 12:02:38 2010
New Revision: 898296

URL: http://svn.apache.org/viewvc?rev=898296&view=rev
Log:
QPID-664: Added some of the missing standard message headers. Added two new test utilities for sending and receiving with the new API (both still works in progress).

Added:
    qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp
    qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp
Modified:
    qpid/trunk/qpid/cpp/examples/messaging/readme.txt
    qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h
    qpid/trunk/qpid/cpp/src/tests/Makefile.am

Modified: qpid/trunk/qpid/cpp/examples/messaging/readme.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/readme.txt?rev=898296&r1=898295&r2=898296&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/readme.txt (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/readme.txt Tue Jan 12 12:02:38 2010
@@ -137,6 +137,8 @@
 
 TODO:
 
+* auto-creating exchanges of different types
+
 * xml content in the xquery example
 
 * 'durable' and 'reliable' subscriptions

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h?rev=898296&r1=898295&r2=898296&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h Tue Jan 12 12:02:38 2010
@@ -58,6 +58,23 @@
     QPID_CLIENT_EXTERN void setContentType(const std::string&);
     QPID_CLIENT_EXTERN const std::string& getContentType() const;
 
+    QPID_CLIENT_EXTERN void setMessageId(const std::string&);
+    QPID_CLIENT_EXTERN const std::string& getMessageId() const;
+
+    QPID_CLIENT_EXTERN void setUserId(const std::string&);
+    QPID_CLIENT_EXTERN const std::string& getUserId() const;
+
+    QPID_CLIENT_EXTERN void setCorrelationId(const std::string&);
+    QPID_CLIENT_EXTERN const std::string& getCorrelationId() const;
+
+    QPID_CLIENT_EXTERN void setTtl(uint64_t ttl);
+    QPID_CLIENT_EXTERN uint64_t getTtl() const;
+
+    QPID_CLIENT_EXTERN void setDurable(bool durable);
+    QPID_CLIENT_EXTERN bool getDurable() const;
+
+    QPID_CLIENT_EXTERN bool isRedelivered() const;
+
     QPID_CLIENT_EXTERN const Variant::Map& getHeaders() const;
     QPID_CLIENT_EXTERN Variant::Map& getHeaders();
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=898296&r1=898295&r2=898296&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Tue Jan 12 12:02:38 2010
@@ -264,22 +264,32 @@
     parent.retrieve(content, message);
 }
 
+
+namespace {
+//TODO: unify conversion to and from 0-10 message that is currently
+//split between IncomingMessages and OutgoingMessage
+const std::string SUBJECT("subject");
+}
+
 void populateHeaders(qpid::messaging::Message& message, 
                      const DeliveryProperties* deliveryProperties, 
                      const MessageProperties* messageProperties)
 {
     if (deliveryProperties) {
-        message.setSubject(deliveryProperties->getRoutingKey());
-        //TODO: convert other delivery properties
+        message.setTtl(deliveryProperties->getTtl());
+        message.setDurable(deliveryProperties->getDeliveryMode() == DELIVERY_MODE_PERSISTENT);
+        MessageImplAccess::get(message).redelivered = deliveryProperties->getRedelivered();
     }
     if (messageProperties) {
         message.setContentType(messageProperties->getContentType());
         if (messageProperties->hasReplyTo()) {
             message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo()));
         }
+        message.setSubject(messageProperties->getApplicationHeaders().getAsString(SUBJECT));
         message.getHeaders().clear();
         translate(messageProperties->getApplicationHeaders(), message.getHeaders());
-        //TODO: convert other message properties
+        message.setCorrelationId(messageProperties->getCorrelationId());
+        message.setUserId(messageProperties->getUserId());
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=898296&r1=898295&r2=898296&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Tue Jan 12 12:02:38 2010
@@ -25,6 +25,7 @@
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/Message.h"
 #include "qpid/messaging/MessageImpl.h"
+#include "qpid/framing/enum.h"
 
 namespace qpid {
 namespace client {
@@ -32,20 +33,25 @@
 
 using qpid::messaging::Address;
 using qpid::messaging::MessageImplAccess;
+using namespace qpid::framing::message;
 
 void OutgoingMessage::convert(const qpid::messaging::Message& from)
 {
     //TODO: need to avoid copying as much as possible
     message.setData(from.getContent());
     message.getMessageProperties().setContentType(from.getContentType());
+    message.getMessageProperties().setCorrelationId(from.getCorrelationId());
+    message.getMessageProperties().setUserId(from.getUserId());
     const Address& address = from.getReplyTo();
     if (address) {
         message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
     }
     translate(from.getHeaders(), message.getMessageProperties().getApplicationHeaders());
-    //TODO: set other message properties
-    message.getDeliveryProperties().setRoutingKey(from.getSubject());
-    //TODO: set other delivery properties
+    message.getDeliveryProperties().setTtl(from.getTtl());
+    if (from.getDurable()) {
+        message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT);
+    }
+
 }
 
 namespace {

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp?rev=898296&r1=898295&r2=898296&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp Tue Jan 12 12:02:38 2010
@@ -41,6 +41,23 @@
 void Message::setContentType(const std::string& s) { impl->setContentType(s); }
 const std::string& Message::getContentType() const { return impl->getContentType(); }
 
+void Message::setMessageId(const std::string& id) { impl->messageId = id; }
+const std::string& Message::getMessageId() const { return impl->messageId; }
+
+void Message::setUserId(const std::string& id) { impl->userId = id; }
+const std::string& Message::getUserId() const { return impl->userId; }
+
+void Message::setCorrelationId(const std::string& id) { impl->correlationId = id; }
+const std::string& Message::getCorrelationId() const { return impl->correlationId; }
+
+void Message::setTtl(uint64_t ttl) { impl->ttl = ttl; }
+uint64_t Message::getTtl() const { return impl->ttl; }
+
+void Message::setDurable(bool durable) { impl->durable = durable; }
+bool Message::getDurable() const { return impl->durable; }
+
+bool Message::isRedelivered() const { return impl->redelivered; }
+
 const VariantMap& Message::getHeaders() const { return impl->getHeaders(); }
 VariantMap& Message::getHeaders() { return impl->getHeaders(); }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp?rev=898296&r1=898295&r2=898296&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp Tue Jan 12 12:02:38 2010
@@ -28,8 +28,18 @@
 const std::string EMPTY_STRING = "";
 }
 
-MessageImpl::MessageImpl(const std::string& c) : bytes(c), internalId(0) {}
-MessageImpl::MessageImpl(const char* chars, size_t count) : bytes(chars, count), internalId(0) {}
+MessageImpl::MessageImpl(const std::string& c) : 
+    ttl(0),
+    durable(false),
+    redelivered(false),
+    bytes(c),
+    internalId(0) {}
+MessageImpl::MessageImpl(const char* chars, size_t count) : 
+    ttl(0),
+    durable (false),
+    redelivered(false),
+    bytes(chars, count),
+    internalId(0) {}
 
 void MessageImpl::setReplyTo(const Address& d) { replyTo = d; }
 const Address& MessageImpl::getReplyTo() const { return replyTo; }

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h?rev=898296&r1=898295&r2=898296&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h Tue Jan 12 12:02:38 2010
@@ -33,6 +33,12 @@
     Address replyTo;
     std::string subject;
     std::string contentType;
+    std::string messageId;
+    std::string userId;
+    std::string correlationId;
+    uint64_t ttl;
+    bool durable;
+    bool redelivered;
     Variant::Map headers;
 
     std::string bytes;

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=898296&r1=898295&r2=898296&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Jan 12 12:02:38 2010
@@ -162,6 +162,20 @@
   ConnectionOptions.h
 sender_LDADD = $(lib_client)
 
+qpidtest_PROGRAMS += qpid_recv
+qpid_recv_SOURCES = \
+  qpid_recv.cpp \
+  TestOptions.h \
+  ConnectionOptions.h
+qpid_recv_LDADD = $(lib_client)
+
+qpidtest_PROGRAMS += qpid_send
+qpid_send_SOURCES = \
+  qpid_send.cpp \
+  TestOptions.h \
+  ConnectionOptions.h
+qpid_send_LDADD = $(lib_client)
+
 qpidtest_PROGRAMS+=perftest
 perftest_SOURCES=perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h
 perftest_INCLUDES=$(PUBLIC_INCLUDES)

Added: qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp?rev=898296&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp Tue Jan 12 12:02:38 2010
@@ -0,0 +1,206 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/Options.h>
+#include <qpid/log/Logger.h>
+#include <qpid/log/Options.h>
+#include <qpid/sys/Time.h>
+#include "TestOptions.h"
+
+#include <iostream>
+
+
+using namespace qpid::messaging;
+using qpid::sys::Duration;
+using qpid::sys::TIME_INFINITE;
+using qpid::sys::TIME_SEC;
+
+using namespace std;
+
+namespace qpid {
+namespace tests {
+
+struct Options : public qpid::Options
+{
+    bool help;
+    std::string url;
+    std::string address;
+    int64_t timeout;
+    bool forever;
+    uint messages;
+    bool ignoreDuplicates;
+    uint capacity;
+    uint ackFrequency;
+    bool printHeaders;
+    qpid::log::Options log;
+
+    Options(const std::string& argv0=std::string())
+        : qpid::Options("Options"),
+          help(false),
+          url("amqp:tcp:127.0.0.1"),
+          timeout(0),
+          forever(false),
+          messages(0),
+          ignoreDuplicates(false),
+          capacity(0),
+          ackFrequency(1),
+          printHeaders(false),
+          log(argv0)
+    {
+        addOptions()
+            ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
+            ("address,a", qpid::optValue(address, "ADDRESS"), "address to receive from")
+            ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting")
+            ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
+            ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
+            ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
+            ("credit-window", qpid::optValue(capacity, "N"), "Credit window (0 implies infinite window)")
+            ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
+            ("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content")
+            ("help", qpid::optValue(help), "print this usage statement");
+        add(log);
+    }
+
+    Duration getTimeout()
+    {
+        if (forever) return TIME_INFINITE;
+        else return timeout*TIME_SEC;
+
+    }
+    bool parse(int argc, char** argv)
+    {
+        try {
+            qpid::Options::parse(argc, argv);
+            if (address.empty()) throw qpid::Exception("Address must be specified!");
+            qpid::log::Logger::instance().configure(log);
+            if (help) {
+                std::ostringstream msg;
+                std::cout << msg << *this << std::endl << std::endl 
+                          << "Drains messages from the specified address" << std::endl;
+                return false;
+            } else {
+                return true;
+            }
+        } catch (const std::exception& e) {
+            std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
+            return false;
+        }
+    }
+};
+
+struct Args : public qpid::TestOptions
+{
+    string address;
+    uint messages;
+    bool ignoreDuplicates;
+    uint capacity;
+    uint ackFrequency;
+
+    Args() : address("test-queue"), messages(0), ignoreDuplicates(false), capacity(0), ackFrequency(1)
+    {
+        addOptions()
+            ("address", qpid::optValue(address, "ADDRESS"), "Address from which to request messages")
+            ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
+            ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
+            ("prefetch", qpid::optValue(capacity, "N"), "Number of messages that can be prefetched (0 implies no prefetch)")
+            ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)");
+    }
+};
+
+const string EOS("eos");
+
+class SequenceTracker
+{
+    uint lastSn;
+  public:
+    SequenceTracker() : lastSn(0) {}
+
+    bool isDuplicate(Message& message)
+    {
+        uint sn = message.getHeaders()["sn"];
+        if (lastSn < sn) {
+            lastSn = sn;
+            return false;
+        } else {
+            return true;
+        }
+    }
+};
+
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+int main(int argc, char ** argv)
+{
+    Options opts;
+    if (opts.parse(argc, argv)) {
+        try {
+            Connection connection =  Connection::open(opts.url);
+            Session session = connection.newSession();
+            Receiver receiver = session.createReceiver(opts.address);
+            receiver.setCapacity(opts.capacity);
+            Message msg;
+            uint count = 0;
+            SequenceTracker sequenceTracker;
+            Duration timeout = opts.getTimeout();
+            bool done = false;
+            while (!done && receiver.fetch(msg, timeout)) {
+                if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
+                    if (msg.getContent() == EOS) {
+                        done = true;
+                    } else {
+                        ++count;
+                        if (opts.printHeaders) {
+                            if (msg.getSubject().size()) std::cout << "Subject: " << msg.getSubject() << std::endl;
+                            if (msg.getReplyTo()) std::cout << "ReplyTo: " << msg.getReplyTo() << std::endl;
+                            if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl;
+                            if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl;
+                            if (msg.getTtl()) std::cout << "TTL: " << msg.getTtl() << std::endl;
+                            if (msg.getDurable()) std::cout << "Durable: true" << std::endl;
+                            if (msg.isRedelivered()) std::cout << "Redelivered: true" << std::endl;
+                            std::cout << "Headers: " << msg.getHeaders() << std::endl;
+                            std::cout << std::endl;
+                        }
+                        std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
+                        if (opts.messages && count >= opts.messages) done = true;
+                    }
+                }
+                if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
+                    session.acknowledge();
+                }
+                //opts.rejectFrequency??
+            }
+            session.acknowledge();
+            session.close();
+            connection.close();
+            return 0;
+        } catch(const std::exception& error) {
+            std::cerr << "Failure: " << error.what() << std::endl;
+        }
+    }
+    return 1;
+}

Added: qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp?rev=898296&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp Tue Jan 12 12:02:38 2010
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/MapContent.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+#include "TestOptions.h"
+
+#include <fstream>
+#include <iostream>
+
+using namespace qpid::messaging;
+using qpid::framing::Uuid;
+using qpid::sys::AbsTime;
+using qpid::sys::now;
+using qpid::sys::TIME_INFINITE;
+
+typedef std::vector<std::string> string_vector;
+
+using namespace std;
+
+namespace qpid {
+namespace tests {
+
+struct Options : public qpid::Options
+{
+    bool help;
+    std::string url;
+    std::string address;
+    int64_t timeout;
+    uint count;
+    std::string id;
+    std::string replyto;
+    uint sendEos;
+    bool durable;
+    uint ttl;
+    std::string userid;
+    std::string correlationid;
+    string_vector properties;
+    string_vector entries;
+    std::string content;
+    qpid::log::Options log;
+
+    Options(const std::string& argv0=std::string())
+        : qpid::Options("Options"),
+          help(false),
+          url("amqp:tcp:127.0.0.1"),
+          timeout(TIME_INFINITE),
+          count(1),
+          sendEos(0),
+          durable(false),
+          ttl(0),
+          log(argv0)
+    {
+        addOptions()
+            ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
+            ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
+            ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "exit after the specified time")
+            ("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables")
+            ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
+            ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
+            ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
+            ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.")
+	    ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
+            ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
+            ("map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content")
+            ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
+            ("user-id", qpid::optValue(userid, "USERID"), "userid for message")
+            ("content", qpid::optValue(content, "CONTENT"), "specify textual content")
+            ("help", qpid::optValue(help), "print this usage statement");
+        add(log);
+    }
+
+    bool parse(int argc, char** argv)
+    {
+        try {
+            qpid::Options::parse(argc, argv);
+            if (address.empty()) throw qpid::Exception("Address must be specified!");
+            qpid::log::Logger::instance().configure(log);
+            if (help) {
+                std::ostringstream msg;
+                std::cout << msg << *this << std::endl << std::endl 
+                          << "Drains messages from the specified address" << std::endl;
+                return false;
+            } else {
+                return true;
+            }
+        } catch (const std::exception& e) {
+            std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
+            return false;
+        }
+    }
+
+    static bool nameval(const std::string& in, std::string& name, std::string& value)
+    {
+        std::string::size_type i = in.find("=");
+        if (i == std::string::npos) {
+            name = in;
+            return false;
+        } else {
+            name = in.substr(0, i);
+            if (i+1 < in.size()) {
+                value = in.substr(i+1);
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    static void setProperty(Message& message, const std::string& property)
+    {
+        std::string name;
+        std::string value;
+        if (nameval(property, name, value)) {
+            message.getHeaders()[name] = value;
+        } else {
+            message.getHeaders()[name] = Variant();
+        }    
+    }
+
+    void setProperties(Message& message) const
+    {
+        for (string_vector::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+            setProperty(message, *i);
+        }
+    }
+
+    void setEntries(MapContent& content) const
+    {
+        for (string_vector::const_iterator i = entries.begin(); i != entries.end(); ++i) {
+            std::string name;
+            std::string value;
+            if (nameval(*i, name, value)) {
+                content[name] = value;
+            } else {
+                content[name] = Variant();
+            }
+        }
+    }
+};
+
+const string EOS("eos");
+
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+int main(int argc, char ** argv)
+{
+    Options opts;
+    if (opts.parse(argc, argv)) {
+        try {
+            Connection connection =  Connection::open(opts.url);
+            Session session = connection.newSession();
+            Sender sender = session.createSender(opts.address);
+            Message msg;
+            msg.setDurable(opts.durable);
+            if (opts.ttl) {
+                msg.setTtl(opts.ttl);
+            }
+            if (!opts.replyto.empty()) msg.setReplyTo(Address(opts.replyto));
+            if (!opts.userid.empty()) msg.setUserId(opts.userid);
+            if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid);
+            opts.setProperties(msg);
+            std::string content;
+            uint sent = 0;
+            while (getline(std::cin, content)) {
+                msg.setContent(content);
+                msg.getHeaders()["sn"] = ++sent;
+                sender.send(msg);
+            }
+            for (uint i = opts.sendEos; i > 0; --i) {
+                msg.getHeaders()["sn"] = ++sent;
+                msg.setContent(EOS);//TODO: add in ability to send digest or similar
+                sender.send(msg);
+            }
+            session.sync();
+            session.close();
+            connection.close();
+            return 0;
+        } catch(const std::exception& error) {
+            std::cout << "Failed: " << error.what() << std::endl;
+        }
+    }
+    return 1;
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org