You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/09/25 22:49:34 UTC

svn commit: r1390123 - in /qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/ha/ tests/

Author: aconway
Date: Tue Sep 25 20:49:33 2012
New Revision: 1390123

URL: http://svn.apache.org/viewvc?rev=1390123&view=rev
Log:
QPID-4325: HA Starting from persistent store

When re-starting a persistent HA cluster, the broker that becomes primary should
keep its store data while all the backup brokers should discard their store data
and catch up from the primary. Backups cannot simply use their own stores
because sequence numbers of stored messages will not match on all brokers. The
backup erases individual queues and exchanges as the catch-up process gets to
them.

Added:
    qpid/trunk/qpid/cpp/src/tests/ha_store_tests.py   (with props)
    qpid/trunk/qpid/cpp/src/tests/ha_test.py   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=1390123&r1=1390122&r2=1390123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Tue Sep 25 20:49:33 2012
@@ -174,6 +174,7 @@ public:
     const std::string& getName() const { return name; }
     bool isDurable() { return durable; }
     qpid::framing::FieldTable& getArgs() { return args; }
+    const qpid::framing::FieldTable& getArgs() const { return args; }
 
     QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; }
     QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h?rev=1390123&r1=1390122&r2=1390123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h Tue Sep 25 20:49:33 2012
@@ -43,6 +43,9 @@ public:
     virtual void bind(const std::string& queue,
                       const std::string& routingKey,
                       qpid::framing::FieldTable& args) = 0;
+
+    virtual std::string getName() const = 0;
+
     virtual ~RecoverableExchange() {};
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1390123&r1=1390122&r2=1390123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Tue Sep 25 20:49:33 2012
@@ -82,6 +82,7 @@ public:
     RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {}
     void setPersistenceId(uint64_t id);
     void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args);
+    string getName() const { return exchange->getName(); }
 };
 
 class RecoverableConfigImpl : public RecoverableConfig

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h?rev=1390123&r1=1390122&r2=1390123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h Tue Sep 25 20:49:33 2012
@@ -43,12 +43,14 @@ class AlternateExchangeSetter
 
     AlternateExchangeSetter(broker::ExchangeRegistry& er) : exchanges(er) {}
 
+    /** If altEx is already known, call setter(altEx) now else save for later */
     void setAlternate(const std::string& altEx, const SetFunction& setter) {
         broker::Exchange::shared_ptr ex = exchanges.find(altEx);
         if (ex) setter(ex);     // Set immediately.
         else setters.insert(Setters::value_type(altEx, setter)); // Save for later.
     }
 
+    /** Add an exchange and call any setters that are waiting for it. */
     void addExchange(boost::shared_ptr<broker::Exchange> exchange) {
         // Update the setters for this exchange
         std::pair<Setters::iterator, Setters::iterator> range = setters.equal_range(exchange->getName());

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1390123&r1=1390122&r2=1390123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Sep 25 20:49:33 2012
@@ -41,6 +41,7 @@
 #include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
 #include "qmf/org/apache/qpid/broker/EventSubscribe.h"
 #include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
+#include <boost/bind.hpp>
 #include <algorithm>
 #include <sstream>
 #include <iostream>
@@ -313,11 +314,11 @@ void BrokerReplicator::doEventQueueDecla
         // The queue was definitely created on the primary.
         if (broker.getQueues().find(name)) {
             QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
-            broker.getQueues().destroy(name);
+            broker.deleteQueue(name, userId, remoteHost);
             stopQueueReplicator(name);
         }
         settings.populate(args, settings.storeSettings);
-        std::pair<boost::shared_ptr<Queue>, bool> result =
+        CreateQueueResult result =
             broker.createQueue(
                 name,
                 settings,
@@ -325,7 +326,7 @@ void BrokerReplicator::doEventQueueDecla
                 values[ALTEX].asString(),
                 userId,
                 remoteHost);
-        assert(result.second);  // Should be true since we destroyed existing queue above
+        assert(result.second);
         startQueueReplicator(result.first);
     }
 }
@@ -361,12 +362,14 @@ void BrokerReplicator::doEventExchangeDe
         // If we already have a exchange with this name, replace it.
         // The exchange was definitely created on the primary.
         if (broker.getExchanges().find(name)) {
-            broker.getExchanges().destroy(name);
+            broker.deleteExchange(name, userId, remoteHost);
             QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name);
         }
-        boost::shared_ptr<Exchange> exchange =
-            createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString());
-        assert(exchange);
+        CreateExchangeResult result = createExchange(
+            name, values[EXTYPE].asString(), values[DURABLE].asBool(), args,
+            values[ALTEX].asString());
+        replicatedExchanges.insert(name);
+        assert(result.second);
     }
 }
 
@@ -380,6 +383,7 @@ void BrokerReplicator::doEventExchangeDe
     } else {
         QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
         broker.deleteExchange(name, userId, remoteHost);
+        replicatedExchanges.erase(name);
     }
 }
 
@@ -399,7 +403,7 @@ void BrokerReplicator::doEventBind(Varia
         QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
-        exchange->bind(queue, key, &args);
+        queue->bind(exchange, key, args);
     }
 }
 
@@ -454,14 +458,20 @@ void BrokerReplicator::doResponseQueue(V
         return;
     string name(values[NAME].asString());
     QPID_LOG(debug, logPrefix << "Queue response: " << name);
+    if (broker.getQueues().find(name)) { // Already exists
+        if (findQueueReplicator(name))
+            return; // Already replicated
+        else {
+            QPID_LOG(debug, logPrefix << "Deleting queue to make way for replica: " << name);
+            broker.deleteQueue(name, userId, remoteHost);
+        }
+    }
     framing::FieldTable args;
     qpid::amqp_0_10::translate(argsMap, args);
-    boost::shared_ptr<Queue> queue =
+    CreateQueueResult result =
         createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
                     getAltExchange(values[ALTEXCHANGE]));
-    // It is normal for the queue to already exist if we are failing over.
-    if (queue) startQueueReplicator(queue);
-    else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
+    if (result.second) startQueueReplicator(result.first);
 }
 
 void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -469,13 +479,22 @@ void BrokerReplicator::doResponseExchang
     if (!replicationTest.replicateLevel(argsMap)) return;
     string name = values[NAME].asString();
     QPID_LOG(debug, logPrefix << "Exchange response: " << name);
+    if (broker.getExchanges().find(name)) {
+        if (replicatedExchanges.find(name) != replicatedExchanges.end())
+            return; // Already replicated
+        else {
+            QPID_LOG(debug, logPrefix << "Deleting exchange to make way for replica: "
+                     << name);
+            broker.deleteExchange(name, userId, remoteHost);
+        }
+    }
     framing::FieldTable args;
     qpid::amqp_0_10::translate(argsMap, args);
-    boost::shared_ptr<Exchange> exchange = createExchange(
+    CreateExchangeResult result = createExchange(
         name, values[TYPE].asString(), values[DURABLE].asBool(), args,
         getAltExchange(values[ALTEXCHANGE]));
-    // It is normal for the exchange to already exist if we are failing over.
-    QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name);
+    assert(result.second);
+    replicatedExchanges.insert(name);
 }
 
 namespace {
@@ -563,7 +582,7 @@ void BrokerReplicator::stopQueueReplicat
     }
 }
 
-boost::shared_ptr<Queue> BrokerReplicator::createQueue(
+BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue(
     const std::string& name,
     bool durable,
     bool autodelete,
@@ -572,7 +591,7 @@ boost::shared_ptr<Queue> BrokerReplicato
 {
     QueueSettings settings(durable, autodelete);
     settings.populate(arguments, settings.storeSettings);
-    std::pair<boost::shared_ptr<Queue>, bool> result =
+    CreateQueueResult result =
         broker.createQueue(
             name,
             settings,
@@ -580,24 +599,22 @@ boost::shared_ptr<Queue> BrokerReplicato
             string(), // Set alternate exchange below
             userId,
             remoteHost);
-    if (result.second) {
-        if (!alternateExchange.empty()) {
-            alternates.setAlternate(
-                alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
-        }
-        return result.first;
+
+    if (!alternateExchange.empty()) {
+        alternates.setAlternate(
+            alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
     }
-    else return  boost::shared_ptr<Queue>();
+    return result;
 }
 
-boost::shared_ptr<Exchange> BrokerReplicator::createExchange(
+BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange(
     const std::string& name,
     const std::string& type,
     bool durable,
     const qpid::framing::FieldTable& args,
     const std::string& alternateExchange)
 {
-    std::pair<boost::shared_ptr<Exchange>, bool> result =
+    CreateExchangeResult result =
         broker.createExchange(
             name,
             type,
@@ -606,15 +623,13 @@ boost::shared_ptr<Exchange> BrokerReplic
             args,
             userId,
             remoteHost);
-    if (result.second) {
-        alternates.addExchange(result.first);
-        if (!alternateExchange.empty()) {
-            alternates.setAlternate(
-                alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1));
-        }
-        return result.first;
+
+    alternates.addExchange(result.first);
+    if (!alternateExchange.empty()) {
+        alternates.setAlternate(
+            alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1));
     }
-    else return  boost::shared_ptr<Exchange>();
+    return result;
 }
 
 bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
@@ -623,4 +638,5 @@ bool BrokerReplicator::isBound(boost::sh
 
 string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
 
+
 }} // namespace broker

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1390123&r1=1390122&r2=1390123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Tue Sep 25 20:49:33 2012
@@ -78,6 +78,8 @@ class BrokerReplicator : public broker::
 
   private:
     typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
+    typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
+    typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
 
     void initializeBridge(broker::Bridge&, broker::SessionHandler&);
 
@@ -98,14 +100,14 @@ class BrokerReplicator : public broker::
     void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
     void stopQueueReplicator(const std::string& name);
 
-    boost::shared_ptr<broker::Queue> createQueue(
+    CreateQueueResult createQueue(
         const std::string& name,
         bool durable,
         bool autodelete,
         const qpid::framing::FieldTable& arguments,
         const std::string& alternateExchange);
 
-    boost::shared_ptr<broker::Exchange> createExchange(
+    CreateExchangeResult createExchange(
         const std::string& name,
         const std::string& type,
         bool durable,
@@ -121,6 +123,8 @@ class BrokerReplicator : public broker::
     bool initialized;
     AlternateExchangeSetter alternates;
     qpid::Address primary;
+    typedef std::set<std::string> StringSet;
+    StringSet replicatedExchanges; // exchanges that have been replicated.
 };
 }} // namespace qpid::broker
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1390123&r1=1390122&r2=1390123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Tue Sep 25 20:49:33 2012
@@ -61,7 +61,10 @@ QueueGuard::QueueGuard(broker::Queue& q,
     range = QueueRange(q);
 }
 
-QueueGuard::~QueueGuard() { cancel(); }
+QueueGuard::~QueueGuard() {
+    QPID_LOG(debug, logPrefix << "Cancelled");
+    cancel();
+}
 
 // NOTE: Called with message lock held.
 void QueueGuard::enqueued(const Message& m) {
@@ -97,7 +100,6 @@ void QueueGuard::completeRange(Delayed::
 }
 
 void QueueGuard::cancel() {
-    QPID_LOG(debug, logPrefix << "Cancelled");
     queue.removeObserver(observer);
     Delayed removed;
     {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1390123&r1=1390122&r2=1390123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Tue Sep 25 20:49:33 2012
@@ -67,6 +67,7 @@ QueueReplicator::QueueReplicator(HaBroke
       logPrefix("Backup queue "+q->getName()+": "),
       queue(q), link(l), brokerInfo(hb.getBrokerInfo())
 {
+    args.setString(QPID_REPLICATE, printable(NONE).str());
     Uuid uuid(true);
     bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp?rev=1390123&r1=1390122&r2=1390123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp Tue Sep 25 20:49:33 2012
@@ -20,6 +20,7 @@
  */
 #include "ReplicationTest.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/Exchange.h"
 #include "qpid/framing/FieldTable.h"
 
 namespace qpid {
@@ -71,5 +72,10 @@ bool ReplicationTest::isReplicated(Repli
     return isReplicated(level, q.getSettings().storeSettings, q.isAutoDelete(), q.hasExclusiveOwner());
 }
 
+bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Exchange& ex)
+{
+    return replicateLevel(ex.getArgs()) >= level;
+}
+
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h?rev=1390123&r1=1390122&r2=1390123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h Tue Sep 25 20:49:33 2012
@@ -30,6 +30,7 @@ namespace qpid {
 
 namespace broker {
 class Queue;
+class Exchange;
 }
 
 namespace framing {
@@ -59,6 +60,7 @@ class ReplicationTest
     bool isReplicated(ReplicateLevel level,
                       const framing::FieldTable& args, bool autodelete, bool exclusive);
     bool isReplicated(ReplicateLevel level, const broker::Queue&);
+    bool isReplicated(ReplicateLevel level, const broker::Exchange&);
   private:
     ReplicateLevel replicateDefault;
 };

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=1390123&r1=1390122&r2=1390123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Sep 25 20:49:33 2012
@@ -348,7 +348,9 @@ EXTRA_DIST +=								\
   run_msg_group_tests							\
   ipv6_test								\
   run_ha_tests								\
+  ha_test.py								\
   ha_tests.py								\
+  ha_store_tests.py							\
   test_env.ps1.in
 
 check_LTLIBRARIES += libdlclose_noop.la

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1390123&r1=1390122&r2=1390123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Tue Sep 25 20:49:33 2012
@@ -425,8 +425,8 @@ class Cluster:
         self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd))
         return self._brokers[-1]
 
-    def ready(self):
-        for b in self: b.ready()
+    def ready(self,  timeout=30, **kwargs):
+        for b in self: b.ready(**kwargs)
 
     def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False):
         for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd)

Added: qpid/trunk/qpid/cpp/src/tests/ha_store_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_store_tests.py?rev=1390123&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_store_tests.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/ha_store_tests.py Tue Sep 25 20:49:33 2012
@@ -0,0 +1,130 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+This module contains tests for HA functionality that requires a store.
+It is not included as part of "make check" since it will not function
+without a store. Currently it can be run from a build of the message
+store.
+"""
+
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
+import traceback
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
+from qpid.datatypes import uuid4
+from brokertest import *
+from ha_test import *
+from threading import Thread, Lock, Condition
+from logging import getLogger, WARN, ERROR, DEBUG, INFO
+from qpidtoollibs import BrokerAgent
+from uuid import UUID
+
+
+class StoreTests(BrokerTest):
+    """Test for HA with persistence."""
+
+    def test_store_recovery(self):
+        """Verify basic store and recover functionality"""
+        cluster = HaCluster(self, 2)
+        sn = cluster[0].connect().session()
+        s = sn.sender("qq;{create:always,node:{durable:true}}")
+        sk = sn.sender("xx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:xx,key:k,queue:qq}]}}")
+        s.send(Message("foo", durable=True))
+        s.send(Message("bar", durable=True))
+        sk.send(Message("baz", durable=True))
+        r = cluster[0].connect().session().receiver("qq")
+        self.assertEqual(r.fetch().content, "foo")
+        r.session.acknowledge()
+        # FIXME aconway 2012-09-21: sending this message is an ugly hack to flush
+        # the dequeue operation on qq.
+        s.send(Message("flush", durable=True))
+
+        def verify(broker, x_count):
+            sn = broker.connect().session()
+            assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count)*["x"])
+            sn.sender("xx/k").send(Message("x", durable=True))
+            assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count+1)*["x"])
+
+        verify(cluster[0], 0)
+        cluster.bounce(0, promote_next=False)
+        cluster[0].promote()
+        cluster[0].wait_status("active")
+        verify(cluster[0], 1)
+        cluster.kill(0, promote_next=False)
+        cluster[1].promote()
+        cluster[1].wait_status("active")
+        verify(cluster[1], 2)
+        cluster.bounce(1, promote_next=False)
+        cluster[1].promote()
+        cluster[1].wait_status("active")
+        verify(cluster[1], 3)
+
+    def test_catchup_store(self):
+        """Verify that a backup erases queue data from store recovery before
+        doing catch-up from the primary."""
+        cluster = HaCluster(self, 2)
+        sn = cluster[0].connect().session()
+        s1 = sn.sender("q1;{create:always,node:{durable:true}}")
+        for m in ["foo","bar"]: s1.send(Message(m, durable=True))
+        s2 = sn.sender("q2;{create:always,node:{durable:true}}")
+        sk2 = sn.sender("ex/k2;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:ex,key:k2,queue:q2}]}}")
+        sk2.send(Message("hello", durable=True))
+        # Wait for backup to catch up.
+        cluster[1].assert_browse_backup("q1", ["foo","bar"]) 
+        cluster[1].assert_browse_backup("q2", ["hello"])
+
+        # Make changes that the backup doesn't see
+        cluster.kill(1, promote_next=False)
+        time.sleep(1)           # FIXME aconway 2012-09-25: 
+        r1 = cluster[0].connect().session().receiver("q1")
+        for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m)
+        r1.session.acknowledge()
+        for m in ["x","y","z"]: s1.send(Message(m, durable=True))
+        # Use old connection to unbind
+        us = cluster[0].connect_old().session(str(uuid4()))
+        us.exchange_unbind(exchange="ex", binding_key="k2", queue="q2")
+        us.exchange_bind(exchange="ex", binding_key="k1", queue="q1")
+        # Restart both brokers from store to get inconsistent sequence numbering.
+        cluster.bounce(0, promote_next=False)
+        cluster[0].promote()
+        cluster[0].wait_status("active")
+        cluster.restart(1)
+        cluster[1].wait_status("ready")
+
+        # Verify state
+        cluster[0].assert_browse("q1",  ["x","y","z"])
+        cluster[1].assert_browse_backup("q1",  ["x","y","z"])
+        sn = cluster[0].connect().session() # FIXME aconway 2012-09-25: should fail over!
+        sn.sender("ex/k1").send("boo")
+        cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"])
+        cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"])
+        sn.sender("ex/k2").send("hoo") # q2 was unbound so this should be dropped.
+        sn.sender("q2").send("end")    # mark the end of the queue for assert_browse
+        cluster[0].assert_browse("q2", ["hello", "end"])
+        cluster[1].assert_browse_backup("q2", ["hello", "end"])
+
+if __name__ == "__main__":
+    shutil.rmtree("brokertest.tmp", True)
+    qpid_ha = os.getenv("QPID_HA_EXEC")
+    if  qpid_ha and os.path.exists(qpid_ha):
+        os.execvp("qpid-python-test",
+                  ["qpid-python-test", "-m", "ha_store_tests"] + sys.argv[1:])
+    else:
+        print "Skipping ha_store_tests, %s not available"%(qpid_ha)

Propchange: qpid/trunk/qpid/cpp/src/tests/ha_store_tests.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/tests/ha_store_tests.py
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1390123&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Tue Sep 25 20:49:33 2012
@@ -0,0 +1,258 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
+import traceback
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
+from qpid.datatypes import uuid4
+from brokertest import *
+from threading import Thread, Lock, Condition
+from logging import getLogger, WARN, ERROR, DEBUG, INFO
+from qpidtoollibs import BrokerAgent
+from uuid import UUID
+
+log = getLogger(__name__)
+
+class QmfAgent(object):
+    """Access to a QMF broker agent."""
+    def __init__(self, address, **kwargs):
+        self._connection = Connection.establish(
+            address, client_properties={"qpid.ha-admin":1}, **kwargs)
+        self._agent = BrokerAgent(self._connection)
+
+    def __getattr__(self, name):
+        a = getattr(self._agent, name)
+        return a
+
+class Credentials(object):
+    """SASL credentials: username, password, and mechanism"""
+    def __init__(self, username, password, mechanism):
+        (self.username, self.password, self.mechanism) = (username, password, mechanism)
+
+    def __str__(self): return "Credentials%s"%(self.tuple(),)
+
+    def tuple(self): return (self.username, self.password, self.mechanism)
+
+    def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
+
+class HaBroker(Broker):
+    """Start a broker with HA enabled
+    @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
+    """
+    def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
+                 client_credentials=None, **kwargs):
+        assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+        args = copy(args)
+        args += ["--load-module", BrokerTest.ha_lib,
+                 "--log-enable=debug+:ha::",
+                 # FIXME aconway 2012-02-13: workaround slow link failover.
+                 "--link-maintenace-interval=0.1",
+                 "--ha-cluster=%s"%ha_cluster]
+        if ha_replicate is not None:
+            args += [ "--ha-replicate=%s"%ha_replicate ]
+        if brokers_url: args += [ "--ha-brokers-url", brokers_url ]
+        Broker.__init__(self, test, args, **kwargs)
+        self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
+        assert os.path.exists(self.qpid_ha_path)
+        self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
+        assert os.path.exists(self.qpid_config_path)
+        getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+        self.qpid_ha_script=import_script(self.qpid_ha_path)
+        self._agent = None
+        self.client_credentials = client_credentials
+
+    def __str__(self): return Broker.__str__(self)
+
+    def qpid_ha(self, args):
+        cred = self.client_credentials
+        url = self.host_port()
+        if cred:
+            url =cred.add_user(url)
+            args = args + ["--sasl-mechanism", cred.mechanism]
+        self.qpid_ha_script.main_except(["", "-b", url]+args)
+
+    def promote(self): self.ready(); self.qpid_ha(["promote"])
+    def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
+    def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
+    def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+
+    def agent(self):
+        if not self._agent:
+            cred = self.client_credentials
+            if cred:
+                self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism)
+            else:
+                self._agent = QmfAgent(self.host_port())
+        return self._agent
+
+    def ha_status(self):
+        hb = self.agent().getHaBroker()
+        hb.update()
+        return hb.status
+
+    def wait_status(self, status):
+        def try_get_status():
+            self._status = "<unknown>"
+            # Ignore ConnectionError, the broker may not be up yet.
+            try:
+                self._status = self.ha_status()
+                return self._status == status;
+            except ConnectionError: return False
+        assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%(
+            self, status, self._status)
+
+    # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
+    def qpid_config(self, args):
+        assert subprocess.call(
+            [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
+
+    def config_replicate(self, from_broker, queue):
+        self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
+
+    def config_declare(self, queue, replication):
+        self.qpid_config(["add", "queue", queue, "--replicate", replication])
+
+    def connect_admin(self, **kwargs):
+        cred = self.client_credentials
+        if cred:
+            return Broker.connect(
+                self, client_properties={"qpid.ha-admin":1},
+                username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism,
+                **kwargs)
+        else:
+            return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
+
+    def wait_backup(self, address):
+        """Wait for address to become valid on a backup broker."""
+        bs = self.connect_admin().session()
+        try: wait_address(bs, address)
+        finally: bs.connection.close()
+
+    def assert_browse(self, queue, expected, **kwargs):
+        """Verify queue contents by browsing."""
+        bs = self.connect().session()
+        try:
+            wait_address(bs, queue)
+            assert_browse_retry(bs, queue, expected, **kwargs)
+        finally: bs.connection.close()
+
+    def assert_browse_backup(self, queue, expected, **kwargs):
+        """Combines wait_backup and assert_browse_retry."""
+        bs = self.connect_admin().session()
+        try:
+            wait_address(bs, queue)
+            assert_browse_retry(bs, queue, expected, **kwargs)
+        finally: bs.connection.close()
+
+    def assert_connect_fail(self):
+        try:
+            self.connect()
+            self.test.fail("Expected ConnectionError")
+        except ConnectionError: pass
+
+    def try_connect(self):
+        try: return self.connect()
+        except ConnectionError: return None
+
+    def ready(self):
+        return Broker.ready(self, client_properties={"qpid.ha-admin":1})
+
+
+class HaCluster(object):
+    _cluster_count = 0
+
+    def __init__(self, test, n, promote=True, **kwargs):
+        """Start a cluster of n brokers"""
+        self.test = test
+        self.kwargs = kwargs
+        self._brokers = []
+        self.id = HaCluster._cluster_count
+        self.broker_id = 0
+        HaCluster._cluster_count += 1
+        for i in xrange(n): self.start(False)
+        self.update_urls()
+        self[0].promote()
+
+    def next_name(self):
+        name="cluster%s-%s"%(self.id, self.broker_id)
+        self.broker_id += 1
+        return name
+
+    def start(self, update_urls=True, args=[]):
+        """Start a new broker in the cluster"""
+        b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
+        self._brokers.append(b)
+        if update_urls: self.update_urls()
+        return b
+
+    def update_urls(self):
+        self.url = ",".join([b.host_port() for b in self])
+        if len(self) > 1:          # No failover addresses on a 1 cluster.
+            for b in self: b.set_brokers_url(self.url)
+
+    def connect(self, i):
+        """Connect with reconnect_urls"""
+        return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
+
+    def kill(self, i, promote_next=True):
+        """Kill broker i, promote broker i+1"""
+        self[i].expect = EXPECT_EXIT_FAIL
+        self[i].kill()
+        if promote_next: self[(i+1) % len(self)].promote()
+
+    def restart(self, i):
+        """Start a broker with the same port, name and data directory. It will get
+        a separate log file: foo.n.log"""
+        b = self._brokers[i]
+        self._brokers[i] = HaBroker(
+            self.test, name=b.name, port=b.port(), brokers_url=self.url,
+            **self.kwargs)
+
+    def bounce(self, i, promote_next=True):
+        """Stop and restart a broker in a cluster."""
+        if (len(self) == 1):
+            self.kill(i, promote_next=False)
+            self.restart(i)
+            self[i].ready()
+            if promote_next: self[i].promote()
+        else:
+            self.kill(i, promote_next)
+            self.restart(i)
+
+    # Behave like a list of brokers.
+    def __len__(self): return len(self._brokers)
+    def __getitem__(self,index): return self._brokers[index]
+    def __iter__(self): return self._brokers.__iter__()
+
+def wait_address(session, address):
+    """Wait for an address to become valid."""
+    def check():
+        try:
+            session.sender(address)
+            return True
+        except NotFound: return False
+    assert retry(check), "Timed out waiting for address %s"%(address)
+
+def valid_address(session, address):
+    """Test if an address is valid"""
+    try:
+        session.receiver(address)
+        return True
+    except NotFound: return False

Propchange: qpid/trunk/qpid/cpp/src/tests/ha_test.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/tests/ha_test.py
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1390123&r1=1390122&r2=1390123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Sep 25 20:49:33 2012
@@ -23,230 +23,12 @@ import traceback
 from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
 from qpid.datatypes import uuid4
 from brokertest import *
+from ha_test import *
 from threading import Thread, Lock, Condition
 from logging import getLogger, WARN, ERROR, DEBUG, INFO
 from qpidtoollibs import BrokerAgent
 from uuid import UUID
 
-log = getLogger(__name__)
-
-class QmfAgent(object):
-    """Access to a QMF broker agent."""
-    def __init__(self, address, **kwargs):
-        self._connection = Connection.establish(
-            address, client_properties={"qpid.ha-admin":1}, **kwargs)
-        self._agent = BrokerAgent(self._connection)
-
-    def __getattr__(self, name):
-        a = getattr(self._agent, name)
-        return a
-
-class Credentials(object):
-    """SASL credentials: username, password, and mechanism"""
-    def __init__(self, username, password, mechanism):
-        (self.username, self.password, self.mechanism) = (username, password, mechanism)
-
-    def __str__(self): return "Credentials%s"%(self.tuple(),)
-
-    def tuple(self): return (self.username, self.password, self.mechanism)
-
-    def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
-
-class HaBroker(Broker):
-    """Start a broker with HA enabled
-    @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
-    """
-    def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
-                 client_credentials=None, **kwargs):
-        assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
-        args = copy(args)
-        args += ["--load-module", BrokerTest.ha_lib,
-                 "--log-enable=debug+:ha::",
-                 # FIXME aconway 2012-02-13: workaround slow link failover.
-                 "--link-maintenace-interval=0.1",
-                 "--ha-cluster=%s"%ha_cluster]
-        if ha_replicate is not None:
-            args += [ "--ha-replicate=%s"%ha_replicate ]
-        if brokers_url: args += [ "--ha-brokers-url", brokers_url ]
-        Broker.__init__(self, test, args, **kwargs)
-        self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
-        assert os.path.exists(self.qpid_ha_path)
-        self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
-        assert os.path.exists(self.qpid_config_path)
-        getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
-        self.qpid_ha_script=import_script(self.qpid_ha_path)
-        self._agent = None
-        self.client_credentials = client_credentials
-
-    def __str__(self): return Broker.__str__(self)
-
-    def qpid_ha(self, args):
-        cred = self.client_credentials
-        url = self.host_port()
-        if cred:
-            url =cred.add_user(url)
-            args = args + ["--sasl-mechanism", cred.mechanism]
-        self.qpid_ha_script.main_except(["", "-b", url]+args)
-
-    def promote(self): self.qpid_ha(["promote"])
-    def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
-    def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
-    def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
-
-    def agent(self):
-        if not self._agent:
-            cred = self.client_credentials
-            if cred:
-                self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism)
-            else:
-                self._agent = QmfAgent(self.host_port())
-        return self._agent
-
-    def ha_status(self):
-        hb = self.agent().getHaBroker()
-        hb.update()
-        return hb.status
-
-    def wait_status(self, status):
-        def try_get_status():
-            self._status = self.ha_status()
-            # Ignore ConnectionError, the broker may not be up yet.
-            try:
-                self._status = self.ha_status()
-                return self._status == status;
-            except ConnectionError: return False
-        assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%(
-            self, status, self._status)
-
-    # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
-    def qpid_config(self, args):
-        assert subprocess.call(
-            [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
-
-    def config_replicate(self, from_broker, queue):
-        self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
-
-    def config_declare(self, queue, replication):
-        self.qpid_config(["add", "queue", queue, "--replicate", replication])
-
-    def connect_admin(self, **kwargs):
-        cred = self.client_credentials
-        if cred:
-            return Broker.connect(
-                self, client_properties={"qpid.ha-admin":1},
-                username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism,
-                **kwargs)
-        else:
-            return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
-
-    def wait_backup(self, address):
-        """Wait for address to become valid on a backup broker."""
-        bs = self.connect_admin().session()
-        try: wait_address(bs, address)
-        finally: bs.connection.close()
-
-    def assert_browse(self, queue, expected, **kwargs):
-        """Verify queue contents by browsing."""
-        bs = self.connect().session()
-        try:
-            wait_address(bs, queue)
-            assert_browse_retry(bs, queue, expected, **kwargs)
-        finally: bs.connection.close()
-
-    def assert_browse_backup(self, queue, expected, **kwargs):
-        """Combines wait_backup and assert_browse_retry."""
-        bs = self.connect_admin().session()
-        try:
-            wait_address(bs, queue)
-            assert_browse_retry(bs, queue, expected, **kwargs)
-        finally: bs.connection.close()
-
-    def assert_connect_fail(self):
-        try:
-            self.connect()
-            self.test.fail("Expected ConnectionError")
-        except ConnectionError: pass
-
-    def try_connect(self):
-        try: return self.connect()
-        except ConnectionError: return None
-
-class HaCluster(object):
-    _cluster_count = 0
-
-    def __init__(self, test, n, promote=True, **kwargs):
-        """Start a cluster of n brokers"""
-        self.test = test
-        self.kwargs = kwargs
-        self._brokers = []
-        self.id = HaCluster._cluster_count
-        self.broker_id = 0
-        HaCluster._cluster_count += 1
-        for i in xrange(n): self.start(False)
-        self.update_urls()
-        self[0].promote()
-
-    def next_name(self):
-        name="cluster%s-%s"%(self.id, self.broker_id)
-        self.broker_id += 1
-        return name
-
-    def start(self, update_urls=True, args=[]):
-        """Start a new broker in the cluster"""
-        b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
-        self._brokers.append(b)
-        if update_urls: self.update_urls()
-        return b
-
-    def update_urls(self):
-        self.url = ",".join([b.host_port() for b in self])
-        if len(self) > 1:          # No failover addresses on a 1 cluster.
-            for b in self: b.set_brokers_url(self.url)
-
-    def connect(self, i):
-        """Connect with reconnect_urls"""
-        return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
-
-    def kill(self, i, promote_next=True):
-        """Kill broker i, promote broker i+1"""
-        self[i].expect = EXPECT_EXIT_FAIL
-        self[i].kill()
-        if promote_next: self[(i+1) % len(self)].promote()
-
-    def restart(self, i):
-        """Start a broker with the same port, name and data directory. It will get
-        a separate log file: foo.n.log"""
-        b = self._brokers[i]
-        self._brokers[i] = HaBroker(
-            self.test, name=b.name, port=b.port(), brokers_url=self.url,
-            **self.kwargs)
-
-    def bounce(self, i, promote_next=True):
-        """Stop and restart a broker in a cluster."""
-        self.kill(i, promote_next)
-        self.restart(i)
-
-    # Behave like a list of brokers.
-    def __len__(self): return len(self._brokers)
-    def __getitem__(self,index): return self._brokers[index]
-    def __iter__(self): return self._brokers.__iter__()
-
-def wait_address(session, address):
-    """Wait for an address to become valid."""
-    def check():
-        try:
-            session.sender(address)
-            return True
-        except NotFound: return False
-    assert retry(check), "Timed out waiting for address %s"%(address)
-
-def valid_address(session, address):
-    """Test if an address is valid"""
-    try:
-        session.receiver(address)
-        return True
-    except NotFound: return False
-
 class ReplicationTests(BrokerTest):
     """Correctness tests for  HA replication."""
 
@@ -927,7 +709,7 @@ class LongTests(BrokerTest):
 
                 if dead is not None:
                     brokers.restart(dead) # Restart backup
-                    brokers[dead].ready(client_properties={"qpid.ha-admin":1})
+                    brokers[dead].ready()
                     dead = None
                 i += 1
         except:
@@ -1031,5 +813,5 @@ if __name__ == "__main__":
         os.execvp("qpid-python-test",
                   ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
     else:
-        print "Skipping ha_tests, qpid_ha not available"
+        print "Skipping ha_tests, %s not available"%(qpid_ha)
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org