You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/05/12 15:17:00 UTC

svn commit: r655494 - in /incubator/qpid/trunk/qpid/cpp/src/tests: Makefile.am consume.cpp publish.cpp

Author: gsim
Date: Mon May 12 06:16:59 2008
New Revision: 655494

URL: http://svn.apache.org/viewvc?rev=655494&view=rev
Log:
Couple of extra simple tests for publishing and consuming in generic fashion.


Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/publish.cpp   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=655494&r1=655493&r2=655494&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Mon May 12 06:16:59 2008
@@ -127,7 +127,7 @@
   topic_publisher	
 #  echo_service
 
-check_PROGRAMS += $(testprogs) interop_runner
+check_PROGRAMS += $(testprogs) interop_runner publish consume
 
 TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= $(srcdir)/run_test 
 
@@ -187,6 +187,14 @@
   TestOptions.h
 interop_runner_LDADD = $(lib_client) $(lib_common) $(extra_libs)
 
+publish_SOURCES = publish.cpp
+publish_LDADD = $(lib_client) $(lib_common) $(extra_libs)
+
+consume_SOURCES = consume.cpp
+consume_LDADD = $(lib_client) $(lib_common) $(extra_libs)
+
+
+
 # Longer running stability tests, not run by default check: target.
 # Not run under valgrind, too slow
 LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest

Added: incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp?rev=655494&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp Mon May 12 06:16:59 2008
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <algorithm>
+#include <iostream>
+#include <memory>
+#include <sstream>
+#include <vector>
+
+#include "TestOptions.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionManager.h"
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+typedef std::vector<std::string> StringSet;
+
+struct Args : public qpid::TestOptions {
+    uint count;
+    uint ack;
+    string queue;
+
+    Args() : count(0), ack(1)
+    {
+        addOptions()
+            ("count", optValue(count, "N"), "number of messages to publish")
+            ("ack-frequency", optValue(ack, "N"), "ack every N messages (0 means use no-ack mode)")
+            ("queue", optValue(queue, "<exchange name>"), "queue to consume from");
+    }
+};
+
+Args opts;
+
+struct Client 
+{
+    Connection connection;
+    Session session;
+
+    Client() 
+    {
+        opts.open(connection);
+        session = connection.newSession(ASYNC);
+    }
+
+    void consume()
+    {
+        
+        SubscriptionManager subs(session);
+        LocalQueue lq(AckPolicy(opts.ack));
+        subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
+        subs.setFlowControl(opts.count, SubscriptionManager::UNLIMITED,
+                            false);
+        subs.subscribe(lq, opts.queue);
+        Message msg;
+        for (size_t i = 0; i < opts.count; ++i) {
+            msg=lq.pop();
+            std::cout << "Received: " << msg.getData().substr(0, 10) << "..." << std::endl;
+        }
+        if (opts.ack != 0)
+            subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
+    }
+
+    ~Client() 
+    {
+        try{
+            session.close();
+            connection.close();
+        } catch(const std::exception& e) {
+            std::cout << e.what() << std::endl;
+        }
+    }
+};
+
+int main(int argc, char** argv)
+{
+    try {
+        opts.parse(argc, argv);
+        Client client;
+        client.consume();
+        return 0;
+    } catch(const std::exception& e) {
+	std::cout << e.what() << std::endl;
+    }
+    return 1;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/publish.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/publish.cpp?rev=655494&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/publish.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/publish.cpp Mon May 12 06:16:59 2008
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <algorithm>
+#include <iostream>
+#include <memory>
+#include <sstream>
+#include <vector>
+
+#include "TestOptions.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionManager.h"
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+typedef std::vector<std::string> StringSet;
+
+struct Args : public qpid::TestOptions {
+    uint size;
+    uint count;
+    bool durable;
+    string destination;
+    string routingKey;
+
+    Args() : size(256), count(1000), durable(true)
+    {
+        addOptions()
+            ("size", optValue(size, "N"), "message size")
+            ("count", optValue(count, "N"), "number of messages to publish")
+            ("durable", optValue(durable, "yes|no"), "use durable messages")
+            ("destination", optValue(destination, "<exchange name>"), "destination to publish to")
+            ("routing-key", optValue(routingKey, "<key>"), "routing key to publish with");
+    }
+};
+
+Args opts;
+
+struct Client 
+{
+    Connection connection;
+    Session session;
+
+    Client() 
+    {
+        opts.open(connection);
+        session = connection.newSession(ASYNC);
+    }
+
+    void publish()
+    {
+        Message msg(string(opts.size, 'X'), opts.routingKey);
+        if (opts.durable)
+            msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+        
+        for (uint i = 0; i < opts.count; i++) {
+            const_cast<std::string&>(msg.getData()).replace(0, sizeof(uint32_t), 
+                                                            reinterpret_cast<const char*>(&i), sizeof(uint32_t));
+            session.messageTransfer(arg::destination=opts.destination,
+                                    arg::content=msg,
+                                    arg::acceptMode=1);
+        }
+        session.sync();
+    }
+
+    ~Client() 
+    {
+        try{
+            session.close();
+            connection.close();
+        } catch(const std::exception& e) {
+            std::cout << e.what() << std::endl;
+        }
+    }
+};
+
+int main(int argc, char** argv)
+{
+    try {
+        opts.parse(argc, argv);
+        Client client;
+        client.publish();
+        return 0;
+    } catch(const std::exception& e) {
+	std::cout << e.what() << std::endl;
+    }
+    return 1;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/publish.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/publish.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date