You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2011/02/25 17:14:32 UTC

svn commit: r1074611 - in /qpid/trunk/qpid/cpp/src: qpid/client/amqp0_10/SenderImpl.cpp tests/cluster_tests.py tests/qpid-receive.cpp

Author: gsim
Date: Fri Feb 25 16:14:32 2011
New Revision: 1074611

URL: http://svn.apache.org/viewvc?rev=1074611&view=rev
Log:
QPID-2999: set redelivered on replay

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=1074611&r1=1074610&r2=1074611&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Fri Feb 25 16:14:32 2011
@@ -135,6 +135,7 @@ void SenderImpl::sendUnreliable(const qp
 void SenderImpl::replay(const sys::Mutex::ScopedLock&)
 {
     for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
+        i->message.setRedelivered(true);
         sink->send(session, name, *i);
     }
 }
@@ -147,7 +148,7 @@ uint32_t SenderImpl::checkPendingSends(b
 uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&)
 {
     if (flush) {
-        session.flush(); 
+        session.flush();
         flushed = true;
     } else {
         flushed = false;

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1074611&r1=1074610&r2=1074611&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Fri Feb 25 16:14:32 2011
@@ -304,6 +304,36 @@ acl allow all all
         # Verify logs are consistent
         cluster_test_logs.verify_logs()
 
+    def test_redelivered(self):
+        """Verify that redelivered flag is set correctly on replayed messages"""
+        cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
+        url = "amqp:tcp:%s,tcp:%s" % (cluster[0].host_port(), cluster[1].host_port())
+        queue = "my-queue"
+        cluster[0].declare_queue(queue)
+        self.sender = self.popen(
+            ["qpid-send",
+             "--broker", url,
+             "--address", queue,
+             "--sequence=true",
+             "--send-eos=1",
+             "--messages=100000",
+             "--connection-options={reconnect:true}"
+             ])
+        self.receiver = self.popen(
+            ["qpid-receive",
+             "--broker", url,
+             "--address", queue,
+             "--ignore-duplicates",
+             "--check-redelivered",
+             "--connection-options={reconnect:true}",
+             "--forever"
+             ])
+        time.sleep(1)#give sender enough time to have some messages to replay
+        cluster[0].kill()
+        self.sender.wait()
+        self.receiver.wait()
+        cluster[1].kill()
+
     class BlockedSend(Thread):
         """Send a message, send is expected to block.
         Verify that it does block (for a given timeout), then allow

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp?rev=1074611&r1=1074610&r2=1074611&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp Fri Feb 25 16:14:32 2011
@@ -53,6 +53,7 @@ struct Options : public qpid::Options
     bool forever;
     uint messages;
     bool ignoreDuplicates;
+    bool checkRedelivered;
     uint capacity;
     uint ackFrequency;
     uint tx;
@@ -75,6 +76,7 @@ struct Options : public qpid::Options
           forever(false),
           messages(0),
           ignoreDuplicates(false),
+          checkRedelivered(false),
           capacity(1000),
           ackFrequency(100),
           tx(0),
@@ -96,6 +98,7 @@ struct Options : public qpid::Options
             ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
             ("messages,m", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
             ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
+            ("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)")
             ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
             ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
             ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
@@ -216,6 +219,8 @@ int main(int argc, char ** argv)
                             std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
                         if (opts.messages && count >= opts.messages) done = true;
                     }
+                } else if (opts.checkRedelivered && !msg.getRedelivered()) {
+                    throw qpid::Exception("duplicate sequence number received, message not marked as redelivered!");
                 }
                 if (opts.tx && (count % opts.tx == 0)) {
                     if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org