You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/07/04 17:30:20 UTC

svn commit: r1499789 - in /qpid/trunk/qpid/cpp/src: qpid/ha/ReplicatingSubscription.cpp qpid/ha/ReplicatingSubscription.h qpid/ha/Settings.h qpid/ha/StatusCheck.h tests/brokertest.py tests/ha_tests.py

Author: aconway
Date: Thu Jul  4 15:30:19 2013
New Revision: 1499789

URL: http://svn.apache.org/r1499789
Log:
QPID-4944: HA Sporadic failure: test_failover_send_receive

Test failing if run as:   ha_tests.py -DDURATION=2
  AssertionError: Stalled test0 waiting for 248, sent 1228

The problem was a missing call to notify() when a ReplicatingSubscription
skipped a message. That resulted in very long (>1s) delays between skipped
messages which caused the test to time out.

Changes:
- ReplicatingSubscription::deliver call notify() to keep consumer active.
- Re-enable test_failover_send_receive.
- Increase default credit for replicating subscription to match qpid-send.
- Rename ReplicatingSubscription::unacked as unready, clearer meaning.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h
    qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1499789&r1=1499788&r2=1499789&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Jul  4 15:30:19 2013
@@ -220,12 +220,12 @@ bool ReplicatingSubscription::deliver(
             QPID_LOG(trace, logPrefix << "On backup, skip " <<
                      LogMessageId(*getQueue(), m));
             guard->complete(id); // This will never be acknowledged.
-            result = false;
+            notify();
+            result = true;
         }
         else {
             QPID_LOG(trace, logPrefix << "Replicated " << LogMessageId(*getQueue(), m));
-            // Only consider unguarded messages for ready status.
-            if (!ready && !isGuarded(l)) unacked += id;
+            if (!ready && !isGuarded(l)) unready += id;
             sendIdEvent(id, l);
             result = ConsumerImpl::deliver(c, m);
         }
@@ -242,7 +242,7 @@ bool ReplicatingSubscription::deliver(
  *@param position: must be <= last position seen by subscription.
  */
 void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
-    if (!ready && isGuarded(l) && unacked.empty()) {
+    if (!ready && isGuarded(l) && unready.empty()) {
         ready = true;
         sys::Mutex::ScopedUnlock u(lock);
         // Notify Primary that a subscription is ready.
@@ -274,7 +274,7 @@ void ReplicatingSubscription::acknowledg
     guard->complete(id);
     {
         Mutex::ScopedLock l(lock);
-        unacked -= id;
+        unready -= id;
         checkReady(l);
     }
     ConsumerImpl::acknowledged(r);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1499789&r1=1499788&r2=1499789&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Thu Jul  4 15:30:19 2013
@@ -129,7 +129,7 @@ class ReplicatingSubscription : public b
     QueuePosition position;
     ReplicationIdSet dequeues;  // Dequeues to be sent in next dequeue event.
     ReplicationIdSet skip;      // Messages already on backup will be skipped.
-    ReplicationIdSet unacked;   // Replicated but un-acknowledged.
+    ReplicationIdSet unready;   // Unguarded, replicated and un-acknowledged.
     bool ready;
     bool cancelled;
     BrokerInfo info;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h?rev=1499789&r1=1499788&r2=1499789&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h Thu Jul  4 15:30:19 2013
@@ -38,7 +38,7 @@ class Settings
   public:
     Settings() : cluster(false), queueReplication(false),
                  replicateDefault(NONE), backupTimeout(10*sys::TIME_SEC),
-                 flowMessages(100), flowBytes(0)
+                 flowMessages(1000), flowBytes(0)
     {}
 
     bool cluster;               // True if we are a cluster member.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1499789&r1=1499788&r2=1499789&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h Thu Jul  4 15:30:19 2013
@@ -33,7 +33,7 @@
 namespace qpid {
 namespace ha {
 
-// FIXME aconway 2012-12-21: This solution is incomplete. It will only protect
+// TODO aconway 2012-12-21: This solution is incomplete. It will only protect
 // against bad promotion if there are READY brokers when this broker starts.
 // It will not help the situation where brokers became READY after this one starts.
 //

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1499789&r1=1499788&r2=1499789&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Thu Jul  4 15:30:19 2013
@@ -578,7 +578,7 @@ class NumberedReceiver(Thread):
     """
     def __init__(self, broker, sender=None, queue="test-queue",
                  connection_options=RECONNECT_OPTIONS,
-                 failover_updates=True, url=None):
+                 failover_updates=True, url=None, args=[]):
         """
         sender: enable flow control. Call sender.received(n) for each message received.
         """
@@ -591,6 +591,7 @@ class NumberedReceiver(Thread):
                "--forever"
                ]
         if failover_updates: cmd += [ "--failover-updates" ]
+        cmd += args
         self.receiver = self.test.popen(
             cmd, expect=EXPECT_RUNNING, stdout=PIPE)
         self.lock = Lock()

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1499789&r1=1499788&r2=1499789&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Jul  4 15:30:19 2013
@@ -927,9 +927,7 @@ class LongTests(HaBrokerTest):
         if d: return float(d)*60
         else: return 3                  # Default is to be quick
 
-    # FIXME aconway 2013-06-27: skip this test pending a fix for
-    # https://issues.apache.org/jira/browse/QPID-4944
-    def skip_test_failover_send_receive(self):
+    def test_failover_send_receive(self):
         """Test failover with continuous send-receive"""
         brokers = HaCluster(self, 3)
 
@@ -937,7 +935,7 @@ class LongTests(HaBrokerTest):
         n = 10
         senders = [NumberedSender(brokers[0], url=brokers.url,
                                   max_depth=1024, failover_updates=False,
-                                 queue="test%s"%(i)) for i in xrange(n)]
+                                  queue="test%s"%(i)) for i in xrange(n)]
         receivers = [NumberedReceiver(brokers[0], url=brokers.url, sender=senders[i],
                                       failover_updates=False,
                                       queue="test%s"%(i)) for i in xrange(n)]
@@ -966,7 +964,7 @@ class LongTests(HaBrokerTest):
                     # one or two backups are running,
                     for s in senders: s.sender.assert_running()
                     for r in receivers: r.receiver.assert_running()
-                    checkpoint = [ r.received+100 for r in receivers ]
+                    checkpoint = [ r.received+10 for r in receivers ]
                     victim = random.choice([0,1,2,primary]) # Give the primary a better chance.
                     if victim == primary:
                         # Don't kill primary till it is active and the next
@@ -982,7 +980,7 @@ class LongTests(HaBrokerTest):
                     # Make sure we are not stalled
                     map(wait_passed, receivers, checkpoint)
                     # Run another checkpoint to ensure things work in this configuration
-                    checkpoint = [ r.received+100 for r in receivers ]
+                    checkpoint = [ r.received+10 for r in receivers ]
                     map(wait_passed, receivers, checkpoint)
                     i += 1
             except:



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org