You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2010/01/20 13:17:35 UTC

svn commit: r901153 - in /qpid/trunk/qpid/cpp/src: qpid/client/amqp0_10/SessionImpl.cpp tests/qpid_recv.cpp tests/qpid_send.cpp

Author: gsim
Date: Wed Jan 20 12:17:34 2010
New Revision: 901153

URL: http://svn.apache.org/viewvc?rev=901153&view=rev
Log:
QPID-664: added support for testing transactions to qpid-send and qpid-recv, fixed bug in rollback.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp
    qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=901153&r1=901152&r2=901153&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Wed Jan 20 12:17:34 2010
@@ -398,7 +398,7 @@
 
 void SessionImpl::acknowledgeImpl()
 {
-    incoming.accept();
+    if (!transactional) incoming.accept();
 }
 
 void SessionImpl::rejectImpl(qpid::messaging::Message& m)

Modified: qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp?rev=901153&r1=901152&r2=901153&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp Wed Jan 20 12:17:34 2010
@@ -48,12 +48,15 @@
     bool help;
     std::string url;
     std::string address;
+    std::string connectionOptions;
     int64_t timeout;
     bool forever;
     uint messages;
     bool ignoreDuplicates;
     uint capacity;
     uint ackFrequency;
+    uint tx;
+    uint rollbackFrequency;
     bool printHeaders;
     qpid::log::Options log;
 
@@ -67,18 +70,23 @@
           ignoreDuplicates(false),
           capacity(0),
           ackFrequency(1),
+          tx(0),
+          rollbackFrequency(0),
           printHeaders(false),
           log(argv0)
     {
         addOptions()
             ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
             ("address,a", qpid::optValue(address, "ADDRESS"), "address to receive from")
+            ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
             ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting")
             ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
             ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
             ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
-            ("credit-window", qpid::optValue(capacity, "N"), "Credit window (0 implies infinite window)")
+            ("capacity", qpid::optValue(capacity, "N"), "Credit window (0 implies infinite window)")
             ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
+            ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
+            ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
             ("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content")
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
@@ -111,25 +119,6 @@
     }
 };
 
-struct Args : public qpid::TestOptions
-{
-    string address;
-    uint messages;
-    bool ignoreDuplicates;
-    uint capacity;
-    uint ackFrequency;
-
-    Args() : address("test-queue"), messages(0), ignoreDuplicates(false), capacity(0), ackFrequency(1)
-    {
-        addOptions()
-            ("address", qpid::optValue(address, "ADDRESS"), "Address from which to request messages")
-            ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
-            ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
-            ("prefetch", qpid::optValue(capacity, "N"), "Number of messages that can be prefetched (0 implies no prefetch)")
-            ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)");
-    }
-};
-
 const string EOS("eos");
 
 class SequenceTracker
@@ -159,12 +148,17 @@
     Options opts;
     if (opts.parse(argc, argv)) {
         try {
-            Connection connection =  Connection::open(opts.url);
-            Session session = connection.newSession();
+            Variant::Map connectionOptions;
+            if (opts.connectionOptions.size()) {
+                parseOptionString(opts.connectionOptions, connectionOptions);
+            }
+            Connection connection =  Connection::open(opts.url, connectionOptions);
+            Session session = connection.newSession(opts.tx > 0);
             Receiver receiver = session.createReceiver(opts.address);
             receiver.setCapacity(opts.capacity);
             Message msg;
             uint count = 0;
+            uint txCount = 0;
             SequenceTracker sequenceTracker;
             Duration timeout = opts.getTimeout();
             bool done = false;
@@ -189,12 +183,26 @@
                         if (opts.messages && count >= opts.messages) done = true;
                     }
                 }
-                if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
+                if (opts.tx && (count % opts.tx == 0)) {
+                    if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
+                        session.rollback();
+                    } else {
+                        session.commit();
+                    }
+                } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
                     session.acknowledge();
                 }
                 //opts.rejectFrequency??
             }
-            session.acknowledge();
+            if (opts.tx) {
+                if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
+                    session.rollback();
+                } else {
+                    session.commit();
+                }
+            } else {
+                session.acknowledge();
+            }
             session.close();
             connection.close();
             return 0;

Modified: qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp?rev=901153&r1=901152&r2=901153&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp Wed Jan 20 12:17:34 2010
@@ -47,6 +47,7 @@
 {
     bool help;
     std::string url;
+    std::string connectionOptions;
     std::string address;
     int64_t timeout;
     uint count;
@@ -60,6 +61,8 @@
     string_vector properties;
     string_vector entries;
     std::string content;
+    uint tx;
+    uint rollbackFrequency;
     qpid::log::Options log;
 
     Options(const std::string& argv0=std::string())
@@ -71,11 +74,14 @@
           sendEos(0),
           durable(false),
           ttl(0),
+          tx(0),
+          rollbackFrequency(0),
           log(argv0)
     {
         addOptions()
             ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
             ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
+            ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
             ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "exit after the specified time")
             ("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables")
             ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
@@ -88,6 +94,8 @@
             ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
             ("user-id", qpid::optValue(userid, "USERID"), "userid for message")
             ("content", qpid::optValue(content, "CONTENT"), "specify textual content")
+            ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
+            ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
     }
@@ -172,8 +180,12 @@
     Options opts;
     if (opts.parse(argc, argv)) {
         try {
-            Connection connection =  Connection::open(opts.url);
-            Session session = connection.newSession();
+            Variant::Map connectionOptions;
+            if (opts.connectionOptions.size()) {
+                parseOptionString(opts.connectionOptions, connectionOptions);
+            }
+            Connection connection =  Connection::open(opts.url, connectionOptions);
+            Session session = connection.newSession(opts.tx > 0);
             Sender sender = session.createSender(opts.address);
             Message msg;
             msg.setDurable(opts.durable);
@@ -186,16 +198,31 @@
             opts.setProperties(msg);
             std::string content;
             uint sent = 0;
+            uint txCount = 0;
             while (getline(std::cin, content)) {
                 msg.setContent(content);
                 msg.getHeaders()["sn"] = ++sent;
                 sender.send(msg);
+                if (opts.tx && (sent % opts.tx == 0)) {
+                    if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
+                        session.rollback();
+                    } else {
+                        session.commit();
+                    }
+                }                
             }
             for (uint i = opts.sendEos; i > 0; --i) {
                 msg.getHeaders()["sn"] = ++sent;
                 msg.setContent(EOS);//TODO: add in ability to send digest or similar
                 sender.send(msg);
             }
+            if (opts.tx) {
+                if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
+                    session.rollback();
+                } else {
+                    session.commit();
+                }
+            }
             session.sync();
             session.close();
             connection.close();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org