You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/07/04 17:30:20 UTC
svn commit: r1499789 - in /qpid/trunk/qpid/cpp/src:
qpid/ha/ReplicatingSubscription.cpp qpid/ha/ReplicatingSubscription.h
qpid/ha/Settings.h qpid/ha/StatusCheck.h tests/brokertest.py
tests/ha_tests.py
Author: aconway
Date: Thu Jul 4 15:30:19 2013
New Revision: 1499789
URL: http://svn.apache.org/r1499789
Log:
QPID-4944: HA Sporadic failure: test_failover_send_receive
Test failing if run as: ha_tests.py -DDURATION=2
AssertionError: Stalled test0 waiting for 248, sent 1228
The problem was a missing call to notify() when a ReplicatingSubscription
skipped a message. That resulted in very long (>1s) delays between skipped
messages which caused the test to time out.
Changes:
- ReplicatingSubscription::deliver call notify() to keep consumer active.
- Re-enable test_failover_send_receive.
- Increase default credit for replicating subscription to match qpid-send.
- Rename ReplicatingSubscription::unacked as unready, clearer meaning.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h
qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
qpid/trunk/qpid/cpp/src/tests/brokertest.py
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1499789&r1=1499788&r2=1499789&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Jul 4 15:30:19 2013
@@ -220,12 +220,12 @@ bool ReplicatingSubscription::deliver(
QPID_LOG(trace, logPrefix << "On backup, skip " <<
LogMessageId(*getQueue(), m));
guard->complete(id); // This will never be acknowledged.
- result = false;
+ notify();
+ result = true;
}
else {
QPID_LOG(trace, logPrefix << "Replicated " << LogMessageId(*getQueue(), m));
- // Only consider unguarded messages for ready status.
- if (!ready && !isGuarded(l)) unacked += id;
+ if (!ready && !isGuarded(l)) unready += id;
sendIdEvent(id, l);
result = ConsumerImpl::deliver(c, m);
}
@@ -242,7 +242,7 @@ bool ReplicatingSubscription::deliver(
*@param position: must be <= last position seen by subscription.
*/
void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
- if (!ready && isGuarded(l) && unacked.empty()) {
+ if (!ready && isGuarded(l) && unready.empty()) {
ready = true;
sys::Mutex::ScopedUnlock u(lock);
// Notify Primary that a subscription is ready.
@@ -274,7 +274,7 @@ void ReplicatingSubscription::acknowledg
guard->complete(id);
{
Mutex::ScopedLock l(lock);
- unacked -= id;
+ unready -= id;
checkReady(l);
}
ConsumerImpl::acknowledged(r);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1499789&r1=1499788&r2=1499789&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Thu Jul 4 15:30:19 2013
@@ -129,7 +129,7 @@ class ReplicatingSubscription : public b
QueuePosition position;
ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
ReplicationIdSet skip; // Messages already on backup will be skipped.
- ReplicationIdSet unacked; // Replicated but un-acknowledged.
+ ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
bool ready;
bool cancelled;
BrokerInfo info;
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h?rev=1499789&r1=1499788&r2=1499789&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h Thu Jul 4 15:30:19 2013
@@ -38,7 +38,7 @@ class Settings
public:
Settings() : cluster(false), queueReplication(false),
replicateDefault(NONE), backupTimeout(10*sys::TIME_SEC),
- flowMessages(100), flowBytes(0)
+ flowMessages(1000), flowBytes(0)
{}
bool cluster; // True if we are a cluster member.
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1499789&r1=1499788&r2=1499789&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h Thu Jul 4 15:30:19 2013
@@ -33,7 +33,7 @@
namespace qpid {
namespace ha {
-// FIXME aconway 2012-12-21: This solution is incomplete. It will only protect
+// TODO aconway 2012-12-21: This solution is incomplete. It will only protect
// against bad promotion if there are READY brokers when this broker starts.
// It will not help the situation where brokers became READY after this one starts.
//
Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1499789&r1=1499788&r2=1499789&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Thu Jul 4 15:30:19 2013
@@ -578,7 +578,7 @@ class NumberedReceiver(Thread):
"""
def __init__(self, broker, sender=None, queue="test-queue",
connection_options=RECONNECT_OPTIONS,
- failover_updates=True, url=None):
+ failover_updates=True, url=None, args=[]):
"""
sender: enable flow control. Call sender.received(n) for each message received.
"""
@@ -591,6 +591,7 @@ class NumberedReceiver(Thread):
"--forever"
]
if failover_updates: cmd += [ "--failover-updates" ]
+ cmd += args
self.receiver = self.test.popen(
cmd, expect=EXPECT_RUNNING, stdout=PIPE)
self.lock = Lock()
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1499789&r1=1499788&r2=1499789&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Jul 4 15:30:19 2013
@@ -927,9 +927,7 @@ class LongTests(HaBrokerTest):
if d: return float(d)*60
else: return 3 # Default is to be quick
- # FIXME aconway 2013-06-27: skip this test pending a fix for
- # https://issues.apache.org/jira/browse/QPID-4944
- def skip_test_failover_send_receive(self):
+ def test_failover_send_receive(self):
"""Test failover with continuous send-receive"""
brokers = HaCluster(self, 3)
@@ -937,7 +935,7 @@ class LongTests(HaBrokerTest):
n = 10
senders = [NumberedSender(brokers[0], url=brokers.url,
max_depth=1024, failover_updates=False,
- queue="test%s"%(i)) for i in xrange(n)]
+ queue="test%s"%(i)) for i in xrange(n)]
receivers = [NumberedReceiver(brokers[0], url=brokers.url, sender=senders[i],
failover_updates=False,
queue="test%s"%(i)) for i in xrange(n)]
@@ -966,7 +964,7 @@ class LongTests(HaBrokerTest):
# one or two backups are running,
for s in senders: s.sender.assert_running()
for r in receivers: r.receiver.assert_running()
- checkpoint = [ r.received+100 for r in receivers ]
+ checkpoint = [ r.received+10 for r in receivers ]
victim = random.choice([0,1,2,primary]) # Give the primary a better chance.
if victim == primary:
# Don't kill primary till it is active and the next
@@ -982,7 +980,7 @@ class LongTests(HaBrokerTest):
# Make sure we are not stalled
map(wait_passed, receivers, checkpoint)
# Run another checkpoint to ensure things work in this configuration
- checkpoint = [ r.received+100 for r in receivers ]
+ checkpoint = [ r.received+10 for r in receivers ]
map(wait_passed, receivers, checkpoint)
i += 1
except:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org