You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/07/26 23:26:38 UTC

svn commit: r1366206 - in /qpid/branches/0.18/qpid/cpp/src: CMakeLists.txt ha.mk qpid/ha/AlternateExchangeSetter.h qpid/ha/BrokerReplicator.cpp qpid/ha/BrokerReplicator.h tests/ha_tests.py

Author: aconway
Date: Thu Jul 26 21:26:37 2012
New Revision: 1366206

URL: http://svn.apache.org/viewvc?rev=1366206&view=rev
Log:
QPID-4107 HA does not replicate alternate-exchange

Set alternate exchange on replicated queues and exchanges.  If the exchange is
available, set it immediately. Otherwise remember what needs to be set so it can
be set when the exchange becomes available.

Added:
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h   (with props)
Modified:
    qpid/branches/0.18/qpid/cpp/src/CMakeLists.txt
    qpid/branches/0.18/qpid/cpp/src/ha.mk
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/branches/0.18/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/0.18/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/CMakeLists.txt?rev=1366206&r1=1366205&r2=1366206&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/0.18/qpid/cpp/src/CMakeLists.txt Thu Jul 26 21:26:37 2012
@@ -626,6 +626,7 @@ set (ha_default ON)
 option(BUILD_HA "Build Active-Passive HA plugin" ${ha_default})
 if (BUILD_HA)
     set (ha_SOURCES
+        qpid/ha/AltExchangeSetter.h
 	qpid/ha/BackupConnectionExcluder.h
 	qpid/ha/BrokerInfo.cpp
 	qpid/ha/BrokerInfo.h

Modified: qpid/branches/0.18/qpid/cpp/src/ha.mk
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/ha.mk?rev=1366206&r1=1366205&r2=1366206&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/ha.mk (original)
+++ qpid/branches/0.18/qpid/cpp/src/ha.mk Thu Jul 26 21:26:37 2012
@@ -23,6 +23,7 @@
 dmoduleexec_LTLIBRARIES += ha.la
 
 ha_la_SOURCES =					\
+  qpid/ha/AltExchangeSetter.h			\
   qpid/ha/Backup.cpp				\
   qpid/ha/Backup.h				\
   qpid/ha/BackupConnectionExcluder.h		\

Added: qpid/branches/0.18/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h?rev=1366206&view=auto
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h (added)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h Thu Jul 26 21:26:37 2012
@@ -0,0 +1,73 @@
+#ifndef QPID_HA_ALTERNATEEXCHANGESETTER_H
+#define QPID_HA_ALTERNATEEXCHANGESETTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include "boost/function.hpp"
+#include <map>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Sets the alternate exchange on queues and exchanges.
+ * Holds onto queues/exchanges if necessary till the alternate exchange is available.
+ * THREAD UNSAFE
+ */
+class AlternateExchangeSetter
+{
+  public:
+    typedef boost::function<void(boost::shared_ptr<broker::Exchange>)> SetFunction;
+
+    AlternateExchangeSetter(broker::ExchangeRegistry& er) : exchanges(er) {}
+
+    void setAlternate(const std::string& altEx, const SetFunction& setter) {
+        broker::Exchange::shared_ptr ex = exchanges.find(altEx);
+        if (ex) setter(ex);     // Set immediately.
+        else setters.insert(Setters::value_type(altEx, setter)); // Save for later.
+    }
+
+    void addExchange(boost::shared_ptr<broker::Exchange> exchange) {
+        // Update the setters for this exchange
+        std::pair<Setters::iterator, Setters::iterator> range = setters.equal_range(exchange->getName());
+        for (Setters::iterator i = range.first; i != range.second; ++i) 
+            i->second(exchange);
+        setters.erase(range.first, range.second);
+    }
+
+    void clear() {
+        if (!setters.empty())
+            QPID_LOG(warning, "Some alternate exchanges were not resolved.");
+        setters.clear();
+    }
+
+  private:
+    typedef std::multimap<std::string, SetFunction> Setters;
+    broker::ExchangeRegistry& exchanges;
+    Setters setters;
+};
+}} // namespace qpid::ha
+
+#endif  /*!QPID_HA_ALTERNATEEXCHANGESETTER_H*/

Propchange: qpid/branches/0.18/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/0.18/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1366206&r1=1366205&r2=1366206&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Thu Jul 26 21:26:37 2012
@@ -41,6 +41,7 @@
 #include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
 #include <algorithm>
 #include <sstream>
+#include <iostream>
 #include <assert.h>
 
 namespace qpid {
@@ -56,6 +57,7 @@ using qmf::org::apache::qpid::broker::Ev
 using qmf::org::apache::qpid::ha::EventMembersUpdate;
 using namespace framing;
 using std::string;
+using std::ostream;
 using types::Variant;
 using namespace broker;
 
@@ -72,6 +74,7 @@ const string SCHEMA_ID("_schema_id");
 const string VALUES("_values");
 
 const string ALTEX("altEx");
+const string ALTEXCHANGE("altExchange");
 const string ARGS("args");
 const string ARGUMENTS("arguments");
 const string AUTODEL("autoDel");
@@ -93,6 +96,7 @@ const string QNAME("qName");
 const string QUEUE("queue");
 const string TYPE("type");
 const string HA_BROKER("habroker");
+const string PARTIAL("partial");
 
 const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#");
 const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#");
@@ -122,7 +126,9 @@ template <class T> bool match(Variant::M
     return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
 }
 
-void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) {
+void sendQuery(const string& packageName, const string& className, const string& queueName,
+               SessionHandler& sessionHandler)
+{
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     Variant::Map request;
     request[_WHAT] = OBJECT;
@@ -142,6 +148,7 @@ void sendQuery(const string& packageName
     props->setAppId(QMF2);
     props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
     headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
+    headerBody.get<qpid::framing::MessageProperties>(true)->setCorrelationId(className);
     AMQFrame header(headerBody);
     header.setBof(false);
     header.setEof(false);
@@ -164,14 +171,14 @@ Variant::Map asMapVoid(const Variant& va
     if (!value.isVoid()) return value.asMap();
     else return Variant::Map();
 }
-
 } // namespace
 
 BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
     : Exchange(QPID_CONFIGURATION_REPLICATOR),
       logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
       haBroker(hb), broker(hb.getBroker()), link(l),
-      initialized(false)
+      initialized(false),
+      alternates(hb.getBroker().getExchanges())
 {}
 
 void BrokerReplicator::initialize() {
@@ -249,13 +256,15 @@ void BrokerReplicator::route(Deliverable
     if (haBroker.getStatus() == JOINING) haBroker.setStatus(CATCHUP);
 
     const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
+    const MessageProperties* messageProperties = msg.getMessage().getProperties<MessageProperties>();
     Variant::List list;
     try {
-        if (!isQMFv2(msg.getMessage()) || !headers)
+        if (!isQMFv2(msg.getMessage()) || !headers || !messageProperties)
             throw Exception("Unexpected message, not QMF2 event or query response.");
         // decode as list
         string content = msg.getMessage().getFrames().getContent();
         amqp_0_10::ListCodec::decode(content, list);
+        QPID_LOG(trace, "Broker replicator received: " << *messageProperties);
         if (headers->getAsString(QMF_CONTENT) == EVENT) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
                 Variant::Map& map = i->asMap();
@@ -283,6 +292,10 @@ void BrokerReplicator::route(Deliverable
                 else if (type == BINDING) doResponseBind(values);
                 else if (type == HA_BROKER) doResponseHaBroker(values);
             }
+            if (messageProperties->getCorrelationId() == EXCHANGE && !headers->isSet(PARTIAL)) {
+                // We have received all of the exchange response.
+                alternates.clear();
+            }
         }
     } catch (const std::exception& e) {
         QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what()
@@ -309,19 +322,10 @@ void BrokerReplicator::doEventQueueDecla
             broker.getQueues().destroy(name);
             stopQueueReplicator(name);
         }
-        std::pair<boost::shared_ptr<Queue>, bool> result =
-            broker.createQueue(
-                name,
-                values[DURABLE].asBool(),
-                autoDel,
-                0, // no owner regardless of exclusivity on primary
-                // FIXME aconway 2012-07-06: handle alternate exchange
-                values[ALTEX].asString(),
-                args,
-                userId,
-                remoteHost);
-        assert(result.second);  // Should be true since we destroyed existing queue above
-        startQueueReplicator(result.first);
+        boost::shared_ptr<Queue> queue = createQueue(
+            name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString());
+        assert(queue);  // Should be created since we destroed the previous queue above.
+        if (queue) startQueueReplicator(queue);
     }
 }
 
@@ -359,17 +363,9 @@ void BrokerReplicator::doEventExchangeDe
             broker.getExchanges().destroy(name);
             QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name);
         }
-        std::pair<boost::shared_ptr<Exchange>, bool> result =
-            broker.createExchange(
-                name,
-                values[EXTYPE].asString(),
-                values[DURABLE].asBool(),
-                // FIXME aconway 2012-07-06: handle alternate exchanges
-                values[ALTEX].asString(),
-                args,
-                userId,
-                remoteHost);
-        assert(result.second);
+        boost::shared_ptr<Exchange> exchange =
+            createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString());
+        assert(exchange);
     }
 }
 
@@ -431,6 +427,22 @@ void BrokerReplicator::doEventMembersUpd
     haBroker.setMembership(members);
 }
 
+namespace {
+
+// Get the alternate exchange from the exchange field of a queue or exchange response.
+static const string EXCHANGE_KEY_PREFIX("org.apache.qpid.broker:exchange:");
+
+string getAltExchange(const types::Variant& var) {
+    if (!var.isVoid()) {
+        management::ObjectId oid(var);
+        string key = oid.getV2Key();
+        if (key.find(EXCHANGE_KEY_PREFIX) != 0) throw Exception("Invalid exchange reference: "+key);
+        return key.substr(EXCHANGE_KEY_PREFIX.size());
+    }
+    else return string();
+}
+}
+
 void BrokerReplicator::doResponseQueue(Variant::Map& values) {
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
     if (!replicationTest.isReplicated(
@@ -443,22 +455,12 @@ void BrokerReplicator::doResponseQueue(V
     QPID_LOG(debug, logPrefix << "Queue response: " << name);
     framing::FieldTable args;
     amqp_0_10::translate(argsMap, args);
-    std::pair<boost::shared_ptr<Queue>, bool> result =
-        broker.createQueue(
-            name,
-            values[DURABLE].asBool(),
-            values[AUTODELETE].asBool(),
-            0 /*i.e. no owner regardless of exclusivity on master*/,
-            ""/*TODO: need to include alternate-exchange*/,
-            args,
-            userId,
-            remoteHost);
-
+    boost::shared_ptr<Queue> queue =
+        createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
+                    getAltExchange(values[ALTEXCHANGE]));
     // It is normal for the queue to already exist if we are failing over.
-     if (result.second)
-         startQueueReplicator(result.first);
-     else
-         QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
+    if (queue) startQueueReplicator(queue);
+    else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
 }
 
 void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -468,16 +470,10 @@ void BrokerReplicator::doResponseExchang
     QPID_LOG(debug, logPrefix << "Exchange response: " << name);
     framing::FieldTable args;
     amqp_0_10::translate(argsMap, args);
-    bool created = broker.createExchange(
-        name,
-        values[TYPE].asString(),
-        values[DURABLE].asBool(),
-        "", // FIXME aconway 2012-07-09: need to include alternate-exchange
-        args,
-        userId,
-        remoteHost
-    ).second;
-    QPID_LOG_IF(debug, !created, logPrefix << "Exchange already exists: " << name);
+    boost::shared_ptr<Exchange> exchange = createExchange(
+        name, values[TYPE].asString(), values[DURABLE].asBool(), args,
+        getAltExchange(values[ALTEXCHANGE]));
+    QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name);
 }
 
 namespace {
@@ -564,6 +560,60 @@ void BrokerReplicator::stopQueueReplicat
     }
 }
 
+boost::shared_ptr<Queue> BrokerReplicator::createQueue(
+    const std::string& name,
+    bool durable,
+    bool autodelete,
+    const qpid::framing::FieldTable& arguments,
+    const std::string& alternateExchange)
+{
+    std::pair<boost::shared_ptr<Queue>, bool> result =
+        broker.createQueue(
+            name,
+            durable,
+            autodelete,
+            0, // no owner regardless of exclusivity on primary
+            string(), // Set alternate exchange below
+            arguments,
+            userId,
+            remoteHost);
+    if (result.second) {
+        if (!alternateExchange.empty()) {
+            alternates.setAlternate(
+                alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
+        }
+        return result.first;
+    }
+    else return  boost::shared_ptr<Queue>();
+}
+
+boost::shared_ptr<Exchange> BrokerReplicator::createExchange(
+    const std::string& name,
+    const std::string& type,
+    bool durable,
+    const qpid::framing::FieldTable& args,
+    const std::string& alternateExchange)
+{
+    std::pair<boost::shared_ptr<Exchange>, bool> result =
+        broker.createExchange(
+            name,
+            type,
+            durable,
+            string(),  // Set alternate exchange below
+            args,
+            userId,
+            remoteHost);
+    if (result.second) {
+        alternates.addExchange(result.first);
+        if (!alternateExchange.empty()) {
+            alternates.setAlternate(
+                alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1));
+        }
+        return result.first;
+    }
+    else return  boost::shared_ptr<Exchange>();
+}
+
 bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
 bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
 bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1366206&r1=1366205&r2=1366206&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/BrokerReplicator.h Thu Jul 26 21:26:37 2012
@@ -24,8 +24,10 @@
 
 #include "types.h"
 #include "ReplicationTest.h"
+#include "AlternateExchangeSetter.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/types/Variant.h"
+#include "qpid/management/ManagementObject.h"
 #include <boost/shared_ptr.hpp>
 #include <boost/enable_shared_from_this.hpp>
 
@@ -95,6 +97,20 @@ class BrokerReplicator : public broker::
     void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
     void stopQueueReplicator(const std::string& name);
 
+    boost::shared_ptr<broker::Queue> createQueue(
+        const std::string& name,
+        bool durable,
+        bool autodelete,
+        const qpid::framing::FieldTable& arguments,
+        const std::string& alternateExchange);
+
+    boost::shared_ptr<broker::Exchange> createExchange(
+        const std::string& name,
+        const std::string& type,
+        bool durable,
+        const qpid::framing::FieldTable& args,
+        const std::string& alternateExchange);
+
     std::string logPrefix;
     std::string userId, remoteHost;
     ReplicationTest replicationTest;
@@ -102,6 +118,7 @@ class BrokerReplicator : public broker::
     broker::Broker& broker;
     boost::shared_ptr<broker::Link> link;
     bool initialized;
+    AlternateExchangeSetter alternates;
 };
 }} // namespace qpid::broker
 

Modified: qpid/branches/0.18/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/tests/ha_tests.py?rev=1366206&r1=1366205&r2=1366206&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/0.18/qpid/cpp/src/tests/ha_tests.py Thu Jul 26 21:26:37 2012
@@ -20,7 +20,7 @@
 
 import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
 import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED
 from qpid.datatypes import uuid4
 from brokertest import *
 from threading import Thread, Lock, Condition
@@ -63,6 +63,7 @@ class HaBroker(Broker):
         args = copy(args)
         args += ["--load-module", BrokerTest.ha_lib,
                  "--log-enable=debug+:ha::",
+                 "--log-enable=trace+:ha::", # FIXME aconway 2012-07-12: 
                  # FIXME aconway 2012-02-13: workaround slow link failover.
                  "--link-maintenace-interval=0.1",
                  "--ha-cluster=%s"%ha_cluster]
@@ -188,7 +189,7 @@ class HaCluster(object):
         self.broker_id += 1
         return name
 
-    def start(self, update_urls=True):
+    def start(self, update_urls=True, args=[]):
         """Start a new broker in the cluster"""
         b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
         self._brokers.append(b)
@@ -760,6 +761,52 @@ acl deny all all
         s1.sender("ex").send("foo");
         self.assertEqual(s1.receiver("q").fetch().content, "foo")
 
+    def test_alterante_exchange(self):
+        """Verify that alternate-exchange on exchanges and queues is propagated
+        to new members of a cluster. """
+        cluster = HaCluster(self, 2)
+        s = cluster[0].connect().session()
+        # altex exchange: acts as alternate exchange
+        s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
+        # altq queue bound to altex, collect re-routed messages.
+        s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
+        # 0ex exchange with alternate-exchange altex and no queues bound
+        s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
+        # create queue q with alternate-exchange altex
+        s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}")
+        # create a bunch of exchanges to ensure we don't clean up prematurely if the
+        # response comes in multiple fragments.
+        for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i)
+
+        def verify(broker):
+            s = broker.connect().session()
+            # Verify unmatched message goes to ex's alternate.
+            s.sender("0ex").send("foo")
+            altq = s.receiver("altq")
+            self.assertEqual("foo", altq.fetch(timeout=0).content)
+            s.acknowledge()
+            # Verify rejected message goes to q's alternate.
+            s.sender("q").send("bar")
+            msg = s.receiver("q").fetch(timeout=0)
+            self.assertEqual("bar", msg.content)
+            s.acknowledge(msg, Disposition(REJECTED)) # Reject the message
+            self.assertEqual("bar", altq.fetch(timeout=0).content)
+            s.acknowledge()
+
+        # Sanity check: alternate exchanges on original broker
+        verify(cluster[0])
+        # Check backup that was connected during setup.
+        cluster[1].wait_backup("0ex")
+        cluster[1].wait_backup("q")
+        cluster.bounce(0)
+        verify(cluster[1])
+        # Check a newly started backup.
+        cluster.start()
+        cluster[2].wait_backup("0ex")
+        cluster[2].wait_backup("q")
+        cluster.bounce(1)
+        verify(cluster[2])
+
 def fairshare(msgs, limit, levels):
     """
     Generator to return prioritised messages in expected order for a given fairshare limit



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org