You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/23 19:00:56 UTC

svn commit: r926686 [4/6] - in /qpid/branches/qmf-devel0.7a/qpid: ./ cpp/ cpp/docs/api/ cpp/docs/src/ cpp/examples/ cpp/examples/messaging/ cpp/examples/pub-sub/ cpp/include/qmf/engine/ cpp/include/qpid/agent/ cpp/include/qpid/client/amqp0_10/ cpp/incl...

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py Tue Mar 23 18:00:49 2010
@@ -29,9 +29,19 @@ from itertools import chain
 
 log = getLogger("qpid.cluster_tests")
 
+# Note: brokers that shut themselves down due to critical error during
+# normal operation will still have an exit code of 0. Brokers that
+# shut down because of an error found during initialize will exit with
+# a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK
+# and EXPECT_EXIT_FAIL in some of the tests below.
+
+# FIXME aconway 2010-03-11: resolve this - ideally any exit due to an error
+# should give non-0 exit status.
+
 # Import scripts as modules
 qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
 
+
 def readfile(filename):
     """Returns te content of file named filename as a string"""
     f = file(filename)
@@ -144,7 +154,7 @@ class LongTests(BrokerTest):
             i += 1
             b = cluster.start(expect=EXPECT_EXIT_FAIL)
             ErrorGenerator(b)
-            time.sleep(1)
+            time.sleep(min(5,self.duration()/2))
         sender.stop()
         receiver.stop(sender.sent)
         for i in range(i, len(cluster)): cluster[i].kill()
@@ -152,7 +162,7 @@ class LongTests(BrokerTest):
     def test_management(self):
         """Run management clients and other clients concurrently."""
 
-        # FIXME aconway 2010-03-03: move to framework
+        # TODO aconway 2010-03-03: move to brokertest framework
         class ClientLoop(StoppableThread):
             """Run a client executable in a loop."""
             def __init__(self, broker, cmd):
@@ -173,14 +183,21 @@ class LongTests(BrokerTest):
                                 self.cmd, expect=EXPECT_UNKNOWN)
                         finally: self.lock.release()
                         try: exit = self.process.wait()
-                        except: exit = 1
+                        except OSError, e:
+                            # Seems to be a race in wait(), it throws
+                            # "no such process" during test shutdown.
+                            # Doesn't indicate a test error, ignore.
+                            return
+                        except Exception, e:
+                            self.process.unexpected(
+                                "client of %s: %s"%(self.broker.name, e))
                         self.lock.acquire()
                         try:
                             # Quit and ignore errors if stopped or expecting failure.
                             if self.stopped: break
                             if exit != 0:
-                                self.process.unexpected("client of %s exit status %s" %
-                                                        (self.broker.name, exit))
+                                self.process.unexpected(
+                                    "client of %s exit code %s"%(self.broker.name, exit))
                         finally: self.lock.release()
                 except Exception, e:
                     self.error = RethrownException("Error in ClientLoop.run")
@@ -218,9 +235,7 @@ class LongTests(BrokerTest):
                 ["perftest", "--count", 1000,
                  "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
                 ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
-                [os.path.join(self.rootdir, "testagent/testagent"), "localhost",
-                 str(broker.port())]
-                ]:
+                ["testagent", "localhost", str(broker.port())] ]:
                 batch.append(ClientLoop(broker, cmd))
             clients.append(batch)
 
@@ -238,7 +253,7 @@ class LongTests(BrokerTest):
             start_mclients(b)
 
         while time.time() < endtime:
-            time.sleep(min(5,self.duration()))
+            time.sleep(min(5,self.duration()/2))
             for b in cluster[alive:]: b.ready() # Check if a broker crashed.
             # Kill the first broker. Ignore errors on its clients and all the mclients
             for c in clients[alive] + mclients: c.expect_fail()
@@ -252,7 +267,6 @@ class LongTests(BrokerTest):
             b = cluster.start()
             start_clients(b)
             for b in cluster[alive:]: start_mclients(b)
-
         for c in chain(mclients, *clients):
             c.stop()
 
@@ -283,6 +297,11 @@ class StoreTests(BrokerTest):
         m = cluster.start("restartme").get_message("q")
         self.assertEqual("x", m.content)
 
+    def stop_cluster(self,broker):
+        """Clean shut-down of a cluster"""
+        self.assertEqual(0, qpid_cluster.main(
+            ["qpid-cluster", "-kf", broker.host_port()]))
+
     def test_persistent_restart(self):
         """Verify persistent cluster shutdown/restart scenarios"""
         cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
@@ -298,7 +317,7 @@ class StoreTests(BrokerTest):
         self.assertEqual(c.get_message("q").content, "2")
         # Shut down the entire cluster cleanly and bring it back up
         a.send_message("q", Message("3", durable=True))
-        self.assertEqual(0, qpid_cluster.main(["qpid-cluster", "-kf", a.host_port()]))
+        self.stop_cluster(a)
         a = cluster.start("a", wait=False)
         b = cluster.start("b", wait=False)
         c = cluster.start("c", wait=True)
@@ -316,7 +335,7 @@ class StoreTests(BrokerTest):
         b.kill()
         self.assertEqual(c.get_message("q").content, "4")
         c.send_message("q", Message("clean", durable=True))
-        self.assertEqual(0, qpid_cluster.main(["qpid-cluster", "-kf", c.host_port()]))
+        self.stop_cluster(c)
         a = cluster.start("a", wait=False)
         b = cluster.start("b", wait=False)
         c = cluster.start("c", wait=True)
@@ -329,7 +348,7 @@ class StoreTests(BrokerTest):
         a.terminate()
         cluster2 = self.cluster(1, args=self.args())
         try:
-            a = cluster2.start("a", expect=EXPECT_EXIT_OK)
+            a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
             a.ready()
             self.fail("Expected exception")
         except: pass
@@ -339,27 +358,29 @@ class StoreTests(BrokerTest):
         cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
         a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
         b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
-        self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+        self.stop_cluster(a)
         self.assertEqual(a.wait(), 0)
         self.assertEqual(b.wait(), 0)
 
         # Restart with a different member and shut down.
         a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
         c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False)
-        self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+        self.stop_cluster(a)
         self.assertEqual(a.wait(), 0)
         self.assertEqual(c.wait(), 0)
-
         # Mix members from both shutdown events, they should fail
-        a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
-        b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+        # FIXME aconway 2010-03-11: can't predict the exit status of these
+        # as it depends on the order of delivery of initial-status messages.
+        # See comment at top of this file.
+        a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False)
+        b = cluster.start("b", expect=EXPECT_UNKNOWN, wait=False)
         self.assertRaises(Exception, lambda: a.ready())
         self.assertRaises(Exception, lambda: b.ready())
 
     def assert_dirty_store(self, broker):
-        self.assertRaises(Exception, lambda: broker.ready())
+        assert retry(lambda: os.path.exists(broker.log)), "Missing log file %s"%broker.log
         msg = re.compile("critical.*no clean store")
-        assert msg.search(readfile(broker.log))
+        assert retry(lambda: msg.search(readfile(broker.log))), "Expected dirty store message in %s"%broker.log
 
     def test_solo_store_clean(self):
         # A single node cluster should always leave a clean store.
@@ -371,7 +392,6 @@ class StoreTests(BrokerTest):
         self.assertEqual(a.get_message("q").content, "x")
 
     def test_last_store_clean(self):
-
         # Verify that only the last node in a cluster to shut down has
         # a clean store. Start with cluster of 3, reduce to 1 then
         # increase again to ensure that a node that was once alone but
@@ -390,13 +410,41 @@ class StoreTests(BrokerTest):
         time.sleep(0.1)   # pause for a to find out hes last.
         a.kill()          # really last
         # b & c should be dirty
-        b = cluster.start("b", wait=False, expect=EXPECT_EXIT_OK)
+        b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
         self.assert_dirty_store(b)
-        c = cluster.start("c", wait=False, expect=EXPECT_EXIT_OK)
+        c = cluster.start("c", wait=False, expect=EXPECT_EXIT_FAIL)
         self.assert_dirty_store(c)
         # a should be clean
         a = cluster.start("a")
         self.assertEqual(a.get_message("q").content, "x")
 
-
+    def test_restart_clean(self):
+        """Verify that we can re-start brokers one by one in a
+        persistent cluster after a clean oshutdown"""
+        cluster = self.cluster(0, self.args())
+        a = cluster.start("a", expect=EXPECT_EXIT_OK)
+        b = cluster.start("b", expect=EXPECT_EXIT_OK)
+        c = cluster.start("c", expect=EXPECT_EXIT_OK)
+        a.send_message("q", Message("x", durable=True))
+        self.stop_cluster(a)
+        a = cluster.start("a")
+        b = cluster.start("b")
+        c = cluster.start("c")
+        self.assertEqual(c.get_message("q").content, "x")
+
+    def test_join_sub_size(self):
+        """Verify that after starting a cluster with cluster-size=N,
+        we can join new members even if size < N-1"""
+        cluster = self.cluster(0, self.args())
+        a = cluster.start("a", wait=False, expect=EXPECT_EXIT_FAIL)
+        b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
+        c = cluster.start("c")
+        a.send_message("q", Message("x", durable=True))
+        a.send_message("q", Message("y", durable=True))
+        a.kill()
+        b.kill()
+        a = cluster.start("a")
+        self.assertEqual(c.get_message("q").content, "x")
+        b = cluster.start("b")
+        self.assertEqual(c.get_message("q").content, "y")
 

Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py:919043-926606

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_recv.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_recv.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_recv.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_recv.cpp Tue Mar 23 18:00:49 2010
@@ -27,12 +27,14 @@
 #include <qpid/Options.h>
 #include <qpid/log/Logger.h>
 #include <qpid/log/Options.h>
+#include <qpid/client/amqp0_10/FailoverUpdates.h>
 #include "TestOptions.h"
 
 #include <iostream>
-
+#include <memory>
 
 using namespace qpid::messaging;
+using qpid::client::amqp0_10::FailoverUpdates;
 
 using namespace std;
 
@@ -54,6 +56,7 @@ struct Options : public qpid::Options
     uint tx;
     uint rollbackFrequency;
     bool printHeaders;
+    bool failoverUpdates;
     qpid::log::Options log;
 
     Options(const std::string& argv0=std::string())
@@ -69,6 +72,7 @@ struct Options : public qpid::Options
           tx(0),
           rollbackFrequency(0),
           printHeaders(false),
+          failoverUpdates(false),
           log(argv0)
     {
         addOptions()
@@ -84,6 +88,7 @@ struct Options : public qpid::Options
             ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
             ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
             ("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content")
+            ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
     }
@@ -143,9 +148,10 @@ int main(int argc, char ** argv)
 {
     Options opts;
     if (opts.parse(argc, argv)) {
+        Connection connection(opts.connectionOptions);
         try {
-            Connection connection(opts.connectionOptions);
             connection.open(opts.url);
+            std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
             Session session = connection.newSession(opts.tx > 0);
             Receiver receiver = session.createReceiver(opts.address);
             receiver.setCapacity(opts.capacity);
@@ -201,6 +207,7 @@ int main(int argc, char ** argv)
             return 0;
         } catch(const std::exception& error) {
             std::cerr << "Failure: " << error.what() << std::endl;
+            connection.close();
         }
     }
     return 1;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_send.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_send.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_send.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_send.cpp Tue Mar 23 18:00:49 2010
@@ -25,16 +25,15 @@
 #include <qpid/messaging/Message.h>
 #include <qpid/messaging/Sender.h>
 #include <qpid/messaging/Session.h>
+#include <qpid/client/amqp0_10/FailoverUpdates.h>
 #include "TestOptions.h"
 
 #include <fstream>
 #include <iostream>
+#include <memory>
 
 using namespace qpid::messaging;
-using qpid::framing::Uuid;
-using qpid::sys::AbsTime;
-using qpid::sys::now;
-using qpid::sys::TIME_INFINITE;
+using qpid::client::amqp0_10::FailoverUpdates;
 
 typedef std::vector<std::string> string_vector;
 
@@ -49,7 +48,6 @@ struct Options : public qpid::Options
     std::string url;
     std::string connectionOptions;
     std::string address;
-    int64_t timeout;
     uint count;
     std::string id;
     std::string replyto;
@@ -64,13 +62,13 @@ struct Options : public qpid::Options
     uint tx;
     uint rollbackFrequency;
     uint capacity;
+    bool failoverUpdates;
     qpid::log::Options log;
 
     Options(const std::string& argv0=std::string())
         : qpid::Options("Options"),
           help(false),
           url("amqp:tcp:127.0.0.1"),
-          timeout(TIME_INFINITE),
           count(1),
           sendEos(0),
           durable(false),
@@ -78,13 +76,13 @@ struct Options : public qpid::Options
           tx(0),
           rollbackFrequency(0),
           capacity(0),
+          failoverUpdates(false),
           log(argv0)
     {
         addOptions()
             ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
             ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
             ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
-            ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "exit after the specified time")
             ("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables")
             ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
             ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
@@ -99,6 +97,7 @@ struct Options : public qpid::Options
             ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue")
             ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
             ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
+            ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
     }
@@ -182,9 +181,10 @@ int main(int argc, char ** argv)
 {
     Options opts;
     if (opts.parse(argc, argv)) {
+        Connection connection(opts.connectionOptions);
         try {
-            Connection connection(opts.connectionOptions);
             connection.open(opts.url);
+            std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
             Session session = connection.newSession(opts.tx > 0);
             Sender sender = session.createSender(opts.address);
             if (opts.capacity) sender.setCapacity(opts.capacity);
@@ -230,6 +230,7 @@ int main(int argc, char ** argv)
             return 0;
         } catch(const std::exception& error) {
             std::cout << "Failed: " << error.what() << std::endl;
+            connection.close();
         }
     }
     return 1;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_stream.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_stream.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_stream.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_stream.cpp Tue Mar 23 18:00:49 2010
@@ -40,16 +40,33 @@ struct Args : public qpid::Options 
 {
     std::string url;
     std::string address;
+    uint size;
     uint rate;
     bool durable;
-
-    Args() : url("amqp:tcp:127.0.0.1:5672"), address("test-queue"), rate(1000), durable(false)
+    uint receiverCapacity;
+    uint senderCapacity;
+    uint ackFrequency;
+
+    Args() : 
+        url("amqp:tcp:127.0.0.1:5672"),
+        address("test-queue"),
+        size(512),
+        rate(1000),
+        durable(false),
+        receiverCapacity(0),
+        senderCapacity(0),
+        ackFrequency(1)
     {
         addOptions()
             ("url", qpid::optValue(url, "URL"), "Url to connect to.")
             ("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.")
+            ("size", qpid::optValue(size, "bytes"), "Message size in bytes (content only, not headers).")
             ("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.")
-            ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.");
+            ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.")
+            ("sender-capacity", qpid::optValue(senderCapacity, "N"), "Credit window (0 implies infinite window)")
+            ("receiver-capacity", qpid::optValue(receiverCapacity, "N"), "Credit window (0 implies infinite window)")
+            ("ack-frequency", qpid::optValue(ackFrequency, "N"),
+             "Ack frequency (0 implies none of the messages will get accepted)");
     }
 };
 
@@ -70,8 +87,8 @@ struct Client : qpid::sys::Runnable
 
     void run()
     {
+        Connection connection;
         try {
-            Connection connection;
             connection.open(opts.url);
             Session session = connection.newSession();
             doWork(session);
@@ -79,6 +96,7 @@ struct Client : qpid::sys::Runnable
             connection.close();
         } catch(const std::exception& error) {
             std::cout << error.what() << std::endl;
+            connection.close();
         }
     }
 
@@ -93,7 +111,8 @@ struct Publish : Client
     void doWork(Session& session)
     {
         Sender sender = session.createSender(opts.address);
-        Message msg;
+        if (opts.senderCapacity) sender.setCapacity(opts.senderCapacity);
+        Message msg(std::string(opts.size, 'X'));
         uint64_t interval = qpid::sys::TIME_SEC / opts.rate;
         uint64_t sent = 0, missedRate = 0;
         qpid::sys::AbsTime start = qpid::sys::now();
@@ -123,9 +142,12 @@ struct Consume : Client
         double maxLatency = 0;
         double totalLatency = 0;
         Receiver receiver = session.createReceiver(opts.address);
+        if (opts.receiverCapacity) receiver.setCapacity(opts.receiverCapacity);
         while (receiver.fetch(msg)) {
-            session.acknowledge();//TODO: add batching option
             ++received;
+            if (opts.ackFrequency && (received % opts.ackFrequency == 0)) {
+                session.acknowledge();
+            }
             //calculate latency
             uint64_t receivedAt = timestamp(qpid::sys::now());
             uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64();

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/quick_topictest.ps1
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/quick_topictest.ps1?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/quick_topictest.ps1 (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/quick_topictest.ps1 Tue Mar 23 18:00:49 2010
@@ -20,11 +20,11 @@
 # Quick and quiet topic test for make check.
 [string]$me = $myInvocation.InvocationName
 $srcdir = Split-Path $me
-& "$srcdir\topictest.ps1" -subscribers 2 -messages 2 -batches 1 > topictest.log 2>&1
+Invoke-Expression "$srcdir\topictest.ps1 -subscribers 2 -messages 2 -batches 1" > topictest.log 2>&1
 if (!$?) {
     "$me FAILED:"
     cat topictest.log
-    exit $LastExitCode
+    exit 1
 }
 Remove-Item topictest.log
 exit 0

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/topictest.ps1
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/topictest.ps1?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/topictest.ps1 (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/topictest.ps1 Tue Mar 23 18:00:49 2010
@@ -17,19 +17,21 @@
 # under the License.
 #
 
-# Run the C++ topic test
-$srcdir = Split-Path $myInvocation.InvocationName
-
 # Parameters with default values: s (subscribers) m (messages) b (batches)
 #                                 h (host) t (false; use transactions)
 param (
   [int]$subscribers = 10,
-  [int]$messages = 2000,
+  [int]$message_count = 2000,
   [int]$batches = 10,
   [string]$broker,
   [switch] $t           # transactional
 )
 
+# Run the C++ topic test
+[string]$me = $myInvocation.InvocationName
+$srcdir = Split-Path $me
+#$srcdir = Split-Path $myInvocation.InvocationName
+
 # Clean up old log files
 Get-Item subscriber_*.log | Remove-Item
 
@@ -41,7 +43,7 @@ if ($t) {
 . $srcdir\find_prog.ps1 .\topic_listener.exe
 
 function subscribe {
-    param ([int]$num)
+    param ([int]$num, [string]$sub)
     "Start subscriber $num"
     $LOG = "subscriber_$num.log"
     $cmdline = ".\$sub\topic_listener $transactional > $LOG 2>&1
@@ -51,7 +53,8 @@ function subscribe {
 }
 
 function publish {
-    Invoke-Expression ".\$sub\topic_publisher --messages $messages --batches $batches --subscribers $subscribers $host $transactional" 2>&1
+    param ([string]$sub)
+    Invoke-Expression ".\$sub\topic_publisher --messages $message_count --batches $batches --subscribers $subscribers $host $transactional" 2>&1
 }
 
 if ($broker.length) {
@@ -60,11 +63,11 @@ if ($broker.length) {
 
 $i = $subscribers
 while ($i -gt 0) {
-  subscribe $i
+  subscribe $i $sub
   $i--
 }
 
 # FIXME aconway 2007-03-27: Hack around startup race. Fix topic test.
 Start-Sleep 2
-publish
+publish $sub
 exit $LastExitCode

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/xml/cluster.xml?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/xml/cluster.xml (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/xml/cluster.xml Tue Mar 23 18:00:49 2010
@@ -64,7 +64,6 @@
       <field name="cluster-id" type="uuid"/>>
       <field name="store-state" type="store-state"/>
       <field name="shutdown-id" type="uuid"/>
-      <field name="config-seq" type="sequence-no"/>
       <field name="first-config" type="str16"/>
     </control>
 
@@ -122,6 +121,10 @@
       encryption (e.g. ssl), ssf is the bit length of the key.  Zero if no
       encryption provided. -->
       <field name="ssf" type="uint32"/>
+      <!-- external auth id (e.g. ssl client certificate id) -->
+      <field name="authid" type="str16"/>
+      <!-- exclude certain sasl mechs, used with ssl and sasl-external -->
+      <field name="nodict" type="bit"/>
     </control>
 
     <!-- Marks the cluster-wide point when a connection is considered closed. -->

Modified: qpid/branches/qmf-devel0.7a/qpid/doc/book/src/SASL-Compatibility.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/doc/book/src/SASL-Compatibility.xml?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/doc/book/src/SASL-Compatibility.xml (original)
+++ qpid/branches/qmf-devel0.7a/qpid/doc/book/src/SASL-Compatibility.xml Tue Mar 23 18:00:49 2010
@@ -12,18 +12,18 @@ This table list the various SASL mechani
 functionality was added to the product. 
 
 || Component   || ANONYMOUS || CRAM-MD5 || DIGEST-MD5 || EXTERNAL || GSSAPI/Kerberos || PLAIN ||
-| C++ Broker    |M3\[[#1]\] |M3\[[#1],[#2]\]|         |           |M3\[[#1],[#2]\]   |   M1   |
-| C++ Client    |M3\[[#1]\] |           |             |           |                  |   M1   |
+| C++ Broker    |M3         |M3         |  M3         |0.8\[[#1]\]|M3                |   M1   |
+| C++ Client    |M3         |0.5        |  0.5        |0.8\[[#1]\]|                  |   M1   |
 | Java Broker   |           |    M1     |             |           |                  |   M1   |
-| Java Client   |           |    M1     |             |           |                  |   M1   |
+| Java Client   |           |    M1     |             |   M1      |                  |   M1   |
 | .Net Client   |    M2     |    M2     |     M2      |   M2      |                  |   M2   |
-| Python Client |           |           |             |           |                  |   ?    |
-| Ruby Client   |           |           |             |           |                  |   ?    |
+| Python Client |0.6\[[#2]\]|0.6\[[#2]\]|0.6\[[#2]\]  |0.6\[[#2]\]|0.6\[[#2]\]       |   M4   |
+| Ruby Client   |0.6\[[#2]\]|0.6\[[#2]\]|0.6\[[#2]\]  |0.6\[[#2]\]|0.6\[[#2]\]       |   M4   |
 
 {anchor:1}
-1: Support for these will be in M3 (currently available on trunk).
+1: Only enabled for client authenticated SSL connections.
 {anchor:2}
-2: C++ Broker uses [Cyrus Sasl|http://freshmeat.net/projects/cyrussasl/] which supports CRAM-MD5 and GSSAPI but these have not been tested yet
+2: On linux only via cyrus-sasl integration.
 
 h4. Custom Mechanisms
 

Modified: qpid/branches/qmf-devel0.7a/qpid/doc/book/src/schemas.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/doc/book/src/schemas.xml?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/doc/book/src/schemas.xml (original)
+++ qpid/branches/qmf-devel0.7a/qpid/doc/book/src/schemas.xml Tue Mar 23 18:00:49 2010
@@ -21,37 +21,73 @@
 -->
 
 <locatingRules xmlns="http://thaiopensource.com/ns/locating-rules/1.0">
-  <uri resource="Download.xml" typeId="DocBook"/>
-  <uri resource="Getting-Started.xml" typeId="DocBook"/>
-  <uri resource="WCF.xml" typeId="DocBook"/>
-  <uri resource="Book-Info.xml" typeId="DocBook"/>
-  <uri resource="Book-Info.xml" typeId="DocBook"/>
-  <uri resource="Book.xml" uri="../../../../../../../usr/share/xml/docbook5/schema/rng/5.0/docbookxi.rnc"/>
-  <uri resource="queue%20state%20replication.xml" typeId="DocBook"/>
-  <uri resource="queue state replication.xml" typeId="DocBook"/>
-  <uri resource="AMQP Ruby Messaging Client.xml" typeId="DocBook"/>
-  <uri resource="AMQP Compatibility.xml" typeId="DocBook"/>
-  <uri resource="Qpid Troubleshooting Guide.xml" typeId="DocBook"/>
-  <uri resource="Book_Info.xml" typeId="DocBook"/>
-  <uri resource="Book_Info.xml" typeId="DocBook"/>
-  <uri resource="AMQP-Messaging-Broker-CPP.xml" typeId="DocBook"/>
-  <uri resource="Configure the Broker via config.xml.xml" typeId="DocBook"/>
-  <uri resource="SSL.xml" typeId="DocBook"/>
-  <uri resource="Using Broker Federation.xml" typeId="DocBook"/>
-  <uri resource="Cheat Sheet for configuring Exchange Options.xml" typeId="DocBook"/>
-  <uri resource="foo.xml" typeId="DocBook"/>
-  <uri resource="Download.xml" typeId="DocBook"/>
-  <uri resource="Download.xml" typeId="DocBook"/>
-  <uri resource="Starting a cluster.xml" typeId="DocBook"/>
-  <uri resource="queue state replication.xml" typeId="DocBook"/>
-  <uri resource="LVQ.xml" typeId="DocBook"/>
-  <uri resource="LVQ.xml" typeId="DocBook"/>
-  <uri resource="Cheat Sheet for configuring Queue Options.xml" typeId="DocBook"/>
-  <uri resource="Cheat Sheet for configuring Exchange Options.xml" typeId="DocBook"/>
-  <uri resource="Cheat Sheet for configuring Exchange Options.xml" typeId="DocBook"/>
-  <uri resource="AMQP-Messaging-Broker-CPP.xml" typeId="DocBook"/>
-  <uri resource="RASC.xml" typeId="DocBook"/>
-  <uri resource="Brokers.xml" typeId="DocBook"/>
-  <uri resource="AMQP.xml" typeId="DocBook"/>
-  <uri resource="qpid-book.xml" typeId="DocBook"/>
+ <uri resource="ACL.xml" typeId="DocBook"/>
+ <uri resource="Add-New-Users.xml" typeId="DocBook"/>
+ <uri resource="AMQP-C++-Messaging-Client.xml" typeId="DocBook"/>
+ <uri resource="AMQP-Compatibility.xml" typeId="DocBook"/>
+ <uri resource="AMQP-Java-JMS-Messaging-Client.xml" typeId="DocBook"/>
+ <uri resource="AMQP-Messaging-Broker-CPP.xml" typeId="DocBook"/>
+ <uri resource="AMQP-Messaging-Broker-Java.xml" typeId="DocBook"/>
+ <uri resource="AMQP-.NET-Messaging-Client.xml" typeId="DocBook"/>
+ <uri resource="AMQP-Python-Messaging-Client.xml" typeId="DocBook"/>
+ <uri resource="AMQP-Ruby-Messaging-Client.xml" typeId="DocBook"/>
+ <uri resource="AMQP.xml" typeId="DocBook"/>
+ <uri resource="Binding-URL-Format.xml" typeId="DocBook"/>
+ <uri resource="Book-Info.xml" typeId="DocBook"/>
+ <uri resource="Book.xml" typeId="DocBook"/>
+ <uri resource="Broker-CPP.xml" typeId="DocBook"/>
+ <uri resource="Broker-Java.xml" typeId="DocBook"/>
+ <uri resource="Cheat-Sheet-for-configuring-Exchange-Options.xml" typeId="DocBook"/>
+ <uri resource="Cheat-Sheet-for-configuring-Queue-Options.xml" typeId="DocBook"/>
+ <uri resource="Clients.xml" typeId="DocBook"/>
+ <uri resource="Configure-ACLs.xml" typeId="DocBook"/>
+ <uri resource="Configure-Java-Qpid-to-use-a-SSL-connection.xml" typeId="DocBook"/>
+ <uri resource="Configure-Log4j-CompositeRolling-Appender.xml" typeId="DocBook"/>
+ <uri resource="Configure-the-Broker-via-config.xml.xml" typeId="DocBook"/>
+ <uri resource="Configure-the-Virtual-Hosts-via-virtualhosts.xml.xml" typeId="DocBook"/>
+ <uri resource="Configuring-Management-Users.xml" typeId="DocBook"/>
+ <uri resource="Configuring-Qpid-JMX-Management-Console.xml" typeId="DocBook"/>
+ <uri resource="Connection-URL-Format.xml" typeId="DocBook"/>
+ <uri resource="Debug-using-log4j.xml" typeId="DocBook"/>
+ <uri resource="Download.xml" typeId="DocBook"/>
+ <uri resource="Excel-AddIn.xml" typeId="DocBook"/>
+ <uri resource="FAQ.xml" typeId="DocBook"/>
+ <uri resource="foo.xml" typeId="DocBook"/>
+ <uri resource="f.xml" typeId="DocBook"/>
+ <uri resource="Getting-Started.xml" typeId="DocBook"/>
+ <uri resource="How-to-Tune-M3-Java-Broker-Performance.xml" typeId="DocBook"/>
+ <uri resource="How-to-Use-JNDI.xml" typeId="DocBook"/>
+ <uri resource="Introduction.xml" typeId="DocBook"/>
+ <uri resource="Java-Broker-Feature-Guide.xml" typeId="DocBook"/>
+ <uri resource="Java-Environment-Variables.xml" typeId="DocBook"/>
+ <uri resource="LVQ.xml" typeId="DocBook"/>
+ <uri resource="Management-Console-Security.xml" typeId="DocBook"/>
+ <uri resource="Management-Design-notes.xml" typeId="DocBook"/>
+ <uri resource="Managing-CPP-Broker.xml" typeId="DocBook"/>
+ <uri resource="MessageStore-Tool.xml" typeId="DocBook"/>
+ <uri resource="NET-User-Guide.xml" typeId="DocBook"/>
+ <uri resource="PythonBrokerTest.xml" typeId="DocBook"/>
+ <uri resource="QMan-Qpid-Management-bridge.xml" typeId="DocBook"/>
+ <uri resource="QMF-Python-Console-Tutorial.xml" typeId="DocBook"/>
+ <uri resource="Qpid-ACLs.xml" typeId="DocBook"/>
+ <uri resource="Qpid-Interoperability-Documentation.xml" typeId="DocBook"/>
+ <uri resource="Qpid-Java-Broker-Management-CLI.xml" typeId="DocBook"/>
+ <uri resource="Qpid-Java-Build-How-To.xml" typeId="DocBook"/>
+ <uri resource="Qpid-Java-FAQ.xml" typeId="DocBook"/>
+ <uri resource="Qpid-JMX-Management-Console-FAQ.xml" typeId="DocBook"/>
+ <uri resource="Qpid-JMX-Management-Console-User-Guide.xml" typeId="DocBook"/>
+ <uri resource="Qpid-JMX-Management-Console.xml" typeId="DocBook"/>
+ <uri resource="Qpid-Management-Features.xml" typeId="DocBook"/>
+ <uri resource="Qpid-Management-Framework.xml" typeId="DocBook"/>
+ <uri resource="Qpid-Troubleshooting-Guide.xml" typeId="DocBook"/>
+ <uri resource="queue-state-replication.xml" typeId="DocBook"/>
+ <uri resource="Running-CPP-Broker.xml" typeId="DocBook"/>
+ <uri resource="SASL-Compatibility.xml" typeId="DocBook"/>
+ <uri resource="SSL.xml" typeId="DocBook"/>
+ <uri resource="Starting-a-cluster.xml" typeId="DocBook"/>
+ <uri resource="System-Properties.xml" typeId="DocBook"/>
+ <uri resource="Use-Priority-Queues.xml" typeId="DocBook"/>
+ <uri resource="Using-Broker-Federation.xml" typeId="DocBook"/>
+ <uri resource="Using-Qpid-with-other-JNDI-Providers.xml" typeId="DocBook"/>
+ <uri resource="WCF.xml" typeId="DocBook"/>
 </locatingRules>

Modified: qpid/branches/qmf-devel0.7a/qpid/dotnet/TestClient/TestClient.csproj
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/dotnet/TestClient/TestClient.csproj?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/dotnet/TestClient/TestClient.csproj (original)
+++ qpid/branches/qmf-devel0.7a/qpid/dotnet/TestClient/TestClient.csproj Tue Mar 23 18:00:49 2010
@@ -1,4 +1,4 @@
-<!--
+<!--
  
  Licensed to the Apache Software Foundation (ASF) under one
  or more contributor license agreements.  See the NOTICE file
@@ -19,7 +19,7 @@
  
 -->
 
-<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="3.5">
+<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="3.5">
   <PropertyGroup>
     <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
     <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>

Modified: qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicListener/TopicListener.csproj
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicListener/TopicListener.csproj?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicListener/TopicListener.csproj (original)
+++ qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicListener/TopicListener.csproj Tue Mar 23 18:00:49 2010
@@ -1,4 +1,4 @@
-<!--
+<!--
  
  Licensed to the Apache Software Foundation (ASF) under one
  or more contributor license agreements.  See the NOTICE file
@@ -19,7 +19,7 @@
  
 -->
 
-<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="3.5">
+<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="3.5">
   <PropertyGroup>
     <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
     <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>

Modified: qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicPublisher/TopicPublisher.csproj
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicPublisher/TopicPublisher.csproj?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicPublisher/TopicPublisher.csproj (original)
+++ qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicPublisher/TopicPublisher.csproj Tue Mar 23 18:00:49 2010
@@ -1,4 +1,4 @@
-<!--
+<!--
  
  Licensed to the Apache Software Foundation (ASF) under one
  or more contributor license agreements.  See the NOTICE file
@@ -19,7 +19,7 @@
  
 -->
 
-<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="3.5">
+<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="3.5">
   <PropertyGroup>
     <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
     <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>

Propchange: qpid/branches/qmf-devel0.7a/qpid/dotnet/build-msbuild.bat
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/dotnet/build-msbuild.bat:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/dotnet/build-nant-release
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/dotnet/build-nant-release:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/dotnet/build-nant.bat
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/dotnet/build-nant.bat:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -3,3 +3,4 @@
 /qpid/branches/java-broker-0-10/qpid/java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/trunk/qpid:796646-796653
+/qpid/trunk/qpid/java:919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/broker/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/broker:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/broker:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker:787599
-/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
+/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/broker/bin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,4 +1,4 @@
 /qpid/branches/0.5-release/qpid/java/broker/bin:757268
 /qpid/branches/java-broker-0-10/qpid/java/broker/bin:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/broker/bin:805429-821809
-/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
+/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,919043-926606

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java Tue Mar 23 18:00:49 2010
@@ -31,7 +31,6 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeReferrer;
 import org.apache.qpid.server.exchange.ExchangeType;
-import org.apache.qpid.server.exchange.topic.TopicBinding;
 import org.apache.qpid.server.exchange.topic.TopicExchangeResult;
 import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
 import org.apache.qpid.server.exchange.topic.TopicNormalizer;
@@ -47,7 +46,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -55,6 +53,7 @@ import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class ManagementExchange implements Exchange, QMFService.Listener
@@ -69,8 +68,7 @@ public class ManagementExchange implemen
     private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
             new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
 
-    private final Map<TopicBinding, FieldTable> _topicBindings = new HashMap<TopicBinding, FieldTable>();
-    private final Set<Binding> _bindingSet = new HashSet<Binding>();
+    private final Set<Binding> _bindingSet = new CopyOnWriteArraySet<Binding>();
     private UUID _id;
     private static final String AGENT_BANK = "0";
 
@@ -254,21 +252,7 @@ public class ManagementExchange implemen
     public synchronized void addBinding(final Binding b)
     {
 
-        _bindingSet.add(b);
-
-        for(BindingListener listener : _listeners)
-        {
-            listener.bindingAdded(this, b);
-        }
-
-        if(_bindingSet.size() > _bindingCountHigh)
-        {
-            _bindingCountHigh = _bindingSet.size();
-        }
-
-        TopicBinding binding = new TopicBinding(new AMQShortString(b.getBindingKey()), b.getQueue(), null);
-
-        if(!_topicBindings.containsKey(binding))
+        if(_bindingSet.add(b))
         {
             AMQShortString routingKey = TopicNormalizer.normalize(new AMQShortString(b.getBindingKey()));
 
@@ -284,10 +268,20 @@ public class ManagementExchange implemen
             {
                 result.addUnfilteredQueue(b.getQueue());
             }
-            _topicBindings.put(binding, null);
 
+            result.addBinding(b);
+        }
+        
+        for(BindingListener listener : _listeners)
+        {
+            listener.bindingAdded(this, b);
+        }
 
+        if(_bindingSet.size() > _bindingCountHigh)
+        {
+            _bindingCountHigh = _bindingSet.size();
         }
+        
         String bindingKey = b.getBindingKey();
 
         if(bindingKey.startsWith("schema.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#."))
@@ -355,6 +349,13 @@ public class ManagementExchange implemen
             HashSet<AMQQueue> queues = new HashSet<AMQQueue>();
             for(TopicMatcherResult result : results)
             {
+                TopicExchangeResult res = (TopicExchangeResult)result;
+
+                for(Binding b : res.getBindings())
+                {
+                    b.incrementMatches();
+                }
+                
                 queues.addAll(((TopicExchangeResult)result).getUnfilteredQueues());
             }
             for(AMQQueue queue : queues)
@@ -378,14 +379,11 @@ public class ManagementExchange implemen
 
     public synchronized void removeBinding(final Binding binding)
     {
-        _bindingSet.remove(binding);
-
-        TopicBinding topicBinding = new TopicBinding(new AMQShortString(binding.getBindingKey()), binding.getQueue(), null);
-
-        if(_topicBindings.containsKey(topicBinding))
+        if(_bindingSet.remove(binding))
         {
-            AMQShortString bindingKey = TopicNormalizer.normalize(topicBinding.getBindingKey());
+            AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
             TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
+            result.removeBinding(binding);
             result.removeUnfilteredQueue(binding.getQueue());
         }
 

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Tue Mar 23 18:00:49 2010
@@ -1004,14 +1004,12 @@ public class QMFService implements Confi
 
         public Long getUnackedMessages()
         {
-            // TODO
-            return 0l;
+            return _obj.getUnackedMessageCount();
         }
 
         public Long getUnackedMessagesHigh()
         {
-            // TODO
-            return 0l;
+            return _obj.getUnackedMessageCountHigh();
         }
 
         public Long getUnackedMessagesLow()
@@ -1404,8 +1402,7 @@ public class QMFService implements Confi
 
         public Long getDelivered()
         {
-            // TODO
-            return 0l;
+            return _obj.getDelivered();
         }
 
         public UUID getId()

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Tue Mar 23 18:00:49 2010
@@ -110,7 +110,7 @@ public class AMQBrokerManagerMBean exten
     public String[] getExchangeTypes() throws IOException
     {
         ArrayList<String> exchangeTypes = new ArrayList<String>();
-        for(ExchangeType<? extends Exchange> ex : _exchangeFactory.getRegisteredTypes())
+        for(ExchangeType<? extends Exchange> ex : _exchangeFactory.getPublicCreatableTypes())
         {
             exchangeTypes.add(ex.getName().toString());
         }

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Mar 23 18:00:49 2010
@@ -200,16 +200,22 @@ public class AMQChannel implements Sessi
     
     private void incrementOutstandingTxnsIfNecessary()
     {
-        //There can currently only be at most one outstanding transaction
-        //due to only having LocalTransaction support. Set value to 1 if 0.
-        _txnCount.compareAndSet(0,1);
+        if(isTransactional())
+        {
+            //There can currently only be at most one outstanding transaction
+            //due to only having LocalTransaction support. Set value to 1 if 0.
+            _txnCount.compareAndSet(0,1);
+        }
     }
     
     private void decrementOutstandingTxnsIfNecessary()
     {
-        //There can currently only be at most one outstanding transaction
-        //due to only having LocalTransaction support. Set value to 0 if 1.
-        _txnCount.compareAndSet(1,0);
+        if(isTransactional())
+        {
+            //There can currently only be at most one outstanding transaction
+            //due to only having LocalTransaction support. Set value to 0 if 1.
+            _txnCount.compareAndSet(1,0);
+        }
     }
 
     public Long getTxnStarts()
@@ -313,7 +319,7 @@ public class AMQChannel implements Sessi
                         }
                         else
                         {
-                            _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage,isTransactional()));
+                            _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage));
                         }
 
                     }
@@ -1031,7 +1037,7 @@ public class AMQChannel implements Sessi
     }
 
 
-    private AMQMessage createAMQMessage(IncomingMessage incomingMessage, boolean transactional)
+    private AMQMessage createAMQMessage(IncomingMessage incomingMessage)
             throws AMQException
     {
 
@@ -1055,7 +1061,6 @@ public class AMQChannel implements Sessi
 
     private class MessageDeliveryAction implements ServerTransaction.Action
     {
-        private boolean _transactional;
         private IncomingMessage _incommingMessage;
         private ArrayList<? extends BaseQueue> _destinationQueues;
 
@@ -1063,7 +1068,6 @@ public class AMQChannel implements Sessi
                                      ArrayList<? extends BaseQueue> destinationQueues,
                                      boolean transactional)
         {
-            _transactional = transactional;
             _incommingMessage = currentMessage;
             _destinationQueues = destinationQueues;
         }
@@ -1074,7 +1078,7 @@ public class AMQChannel implements Sessi
             {
                 final boolean immediate = _incommingMessage.isImmediate();
 
-                final AMQMessage amqMessage = createAMQMessage(_incommingMessage, _transactional);
+                final AMQMessage amqMessage = createAMQMessage(_incommingMessage);
                 MessageReference ref = amqMessage.newReference();
 
                 for(final BaseQueue queue : _destinationQueues)

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java Tue Mar 23 18:00:49 2010
@@ -37,7 +37,7 @@ public class Binding
     private final UUID _id;
     private final AtomicLong _matches = new AtomicLong();
 
-    Binding(UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
+    public Binding(UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
     {
         _id = id;
         _bindingKey = bindingKey;
@@ -89,29 +89,30 @@ public class Binding
     @Override
     public boolean equals(final Object o)
     {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o)
+        {
+            return true;
+        }
+        
+        if (o == null || !(o instanceof Binding))
+        {
+            return false;
+        }
 
         final Binding binding = (Binding) o;
 
-        if (!_bindingKey.equals(binding._bindingKey)) return false;
-        if (!_exchange.equals(binding._exchange)) return false;
-        if (!_queue.equals(binding._queue)) return false;
-
-        return true;
+        return (_bindingKey == null ? binding.getBindingKey() == null : _bindingKey.equals(binding.getBindingKey()))
+            && (_exchange == null ? binding.getExchange() == null : _exchange.equals(binding.getExchange()))
+            && (_queue == null ? binding.getQueue() == null : _queue.equals(binding.getQueue()));
     }
 
     @Override
     public int hashCode()
     {
-        int result = _bindingKey.hashCode();
-        result = 31 * result + _queue.hashCode();
-        result = 31 * result + _exchange.hashCode();
+        int result = _bindingKey == null ? 1 : _bindingKey.hashCode();
+        result = 31 * result + (_queue == null ? 3 : _queue.hashCode());
+        result = 31 * result + (_exchange == null ? 5 : _exchange.hashCode());
         return result;
     }
 
-
-
-
-
 }

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java Tue Mar 23 18:00:49 2010
@@ -75,6 +75,10 @@ public interface QueueConfig extends Con
     long getPersistentMsgEnqueues();
 
     long getPersistentMsgDequeues();
+    
+    long getUnackedMessageCount();
+    
+    long getUnackedMessageCountHigh();
 
     void purge(long request);
 }
\ No newline at end of file

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SubscriptionConfig.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SubscriptionConfig.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SubscriptionConfig.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SubscriptionConfig.java Tue Mar 23 18:00:49 2010
@@ -43,5 +43,5 @@ public interface SubscriptionConfig exte
 
     boolean isExplicitAcknowledge();
 
-
+    Long getDelivered();
 }
\ No newline at end of file

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Tue Mar 23 18:00:49 2010
@@ -20,8 +20,12 @@
  */
 package org.apache.qpid.server.exchange;
 
-import org.apache.log4j.Logger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUnknownExchangeType;
 import org.apache.qpid.framing.AMQShortString;
@@ -30,10 +34,6 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
 public class DefaultExchangeFactory implements ExchangeFactory
 {
     private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
@@ -60,6 +60,21 @@ public class DefaultExchangeFactory impl
     {
         return _exchangeClassMap.values();
     }
+    
+    public Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes()
+    {
+        Collection<ExchangeType<? extends Exchange>> publicTypes = 
+                                new ArrayList<ExchangeType<? extends Exchange>>();
+        publicTypes.addAll(_exchangeClassMap.values());
+        
+        //Remove the ManagementExchange type if present, as these 
+        //are private and cannot be created by external means
+        publicTypes.remove(ManagementExchange.TYPE);
+        
+        return publicTypes;
+    }
+    
+    
 
     public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete)
             throws AMQException

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java Tue Mar 23 18:00:49 2010
@@ -38,6 +38,8 @@ public interface ExchangeFactory
     void initialise(VirtualHostConfiguration hostConfig);
 
     Collection<ExchangeType<? extends Exchange>> getRegisteredTypes();
+    
+    Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes();
 
     Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException;
 }

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Tue Mar 23 18:00:49 2010
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.log4j.Logger;
 import org.apache.qpid.framing.AMQTypedValue;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.message.AMQMessageHeader;
 
 /**
@@ -38,69 +39,35 @@ class HeadersBinding
     private static final Logger _logger = Logger.getLogger(HeadersBinding.class);
 
     private final FieldTable _mappings;
+    private final Binding _binding;
     private final Set<String> required = new HashSet<String>();
     private final Map<String,Object> matches = new HashMap<String,Object>();
     private boolean matchAny;
 
-    private final class MatchesOrProcessor implements FieldTable.FieldTableElementProcessor
-    {
-        private Boolean _result = Boolean.FALSE;
-
-        public boolean processElement(String propertyName, AMQTypedValue value)
-        {
-            if((value != null) && (value.getValue() != null) && value.getValue().equals(matches.get(propertyName)))
-            {
-                _result = Boolean.TRUE;
-                return false;
-            }
-            return true;
-        }
-
-        public Object getResult()
-        {
-            return _result;
-        }
-    }
-
-    private final class RequiredOrProcessor implements FieldTable.FieldTableElementProcessor
-    {
-        Boolean _result = Boolean.FALSE;
-
-        public boolean processElement(String propertyName, AMQTypedValue value)
-        {
-            if(required.contains(propertyName))
-            {
-                _result = Boolean.TRUE;
-                return false;
-            }
-            return true;
-        }
-
-        public Object getResult()
-        {
-            return _result;
-        }
-    }
-
-
-
     /**
-     * Creates a binding for a set of mappings. Those mappings whose value is
+     * Creates a header binding for a set of mappings. Those mappings whose value is
      * null or the empty string are assumed only to be required headers, with
      * no constraint on the value. Those with a non-null value are assumed to
      * define a required match of value.
-     * @param mappings the defined mappings this binding should use
+     * 
+     * @param binding the binding to create a header binding using
      */
-
-    HeadersBinding(FieldTable mappings)
+    public HeadersBinding(Binding binding)
     {
-        _mappings = mappings;
-        initMappings();
+        _binding = binding;
+        if(_binding !=null)
+        {
+            _mappings = FieldTable.convertToFieldTable(_binding.getArguments());
+            initMappings();
+        }
+        else
+        {
+            _mappings = null;
+        }
     }
-
+    
     private void initMappings()
     {
-
         _mappings.processOverElements(new FieldTable.FieldTableElementProcessor()
         {
 
@@ -133,6 +100,11 @@ class HeadersBinding
     {
         return _mappings;
     }
+    
+    public Binding getBinding()
+    {
+        return _binding;
+    }
 
     /**
      * Checks whether the supplied headers match the requirements of this binding
@@ -250,4 +222,39 @@ class HeadersBinding
     {
         return key.startsWith("X-") || key.startsWith("x-");
     }
-}
+    
+    @Override
+    public boolean equals(final Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+
+        if (o == null)
+        {
+            return false;
+        }
+
+        if (!(o instanceof HeadersBinding))
+        {
+            return false;
+        }
+
+        final HeadersBinding hb = (HeadersBinding) o;
+
+        if(_binding == null)
+        {
+            if(hb.getBinding() != null)
+            {
+                return false;
+            }
+        }
+        else if (!_binding.equals(hb.getBinding()))
+        {
+            return false;
+        }
+
+        return true;
+    }
+}
\ No newline at end of file

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Tue Mar 23 18:00:49 2010
@@ -24,8 +24,6 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
@@ -36,10 +34,11 @@ import org.apache.qpid.server.binding.Bi
 
 import javax.management.JMException;
 import java.util.ArrayList;
-import java.util.List;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 
 /**
  * An exchange that binds queues based on a set of required headers and header values
@@ -72,7 +71,14 @@ public class HeadersExchange extends Abs
 {
 
     private static final Logger _logger = Logger.getLogger(HeadersExchange.class);
+    
+    private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey =
+                            new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>();
+    
+    private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers =
+                            new CopyOnWriteArrayList<HeadersBinding>();
 
+    
     public static final ExchangeType<HeadersExchange> TYPE = new ExchangeType<HeadersExchange>()
     {
 
@@ -102,34 +108,12 @@ public class HeadersExchange extends Abs
         }
     };
 
-
-    private final List<Registration> _bindings = new CopyOnWriteArrayList<Registration>();
-    private Map<AMQShortString, Registration> _bindingByKey = new ConcurrentHashMap<AMQShortString, Registration>();
-
-
     public HeadersExchange()
     {
         super(TYPE);
     }
+    
 
-    public void registerQueue(String routingKey, AMQQueue queue, Map<String,Object> args)
-    {
-        registerQueue(new AMQShortString(routingKey), queue, FieldTable.convertToFieldTable(args));
-    }
-
-    public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args)
-    {
-        _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() + " with " + args);
-
-        Registration registration = new Registration(new HeadersBinding(args), queue, routingKey);
-        _bindings.add(registration);
-
-    }
-
-    public void deregisterQueue(String routingKey, AMQQueue queue, Map<String,Object> args)
-    {
-        _bindings.remove(new Registration(args == null ? null : new HeadersBinding(FieldTable.convertToFieldTable(args)), queue, new AMQShortString(routingKey)));
-    }
 
     public ArrayList<BaseQueue> doRoute(InboundMessage payload)
     {
@@ -138,24 +122,27 @@ public class HeadersExchange extends Abs
         {
             _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + header);
         }
-        boolean routed = false;
-        ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>();
-        for (Registration e : _bindings)
+        
+        LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>();
+        
+        for (HeadersBinding hb : _bindingHeaderMatchers)
         {
-
-            if (e.binding.matches(header))
+            if (hb.matches(header))
             {
+                Binding b = hb.getBinding();
+                
+                b.incrementMatches();
+                
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Exchange " + getNameShortString() + ": delivering message with headers " +
-                                  header + " to " + e.queue.getNameShortString());
+                                  header + " to " + b.getQueue().getNameShortString());
                 }
-                queues.add(e.queue);
-
-                routed = true;
+                queues.add(b.getQueue());
             }
         }
-        return queues;
+        
+        return new ArrayList<BaseQueue>(queues);
     }
 
     public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
@@ -166,38 +153,49 @@ public class HeadersExchange extends Abs
 
     public boolean isBound(AMQShortString routingKey, AMQQueue queue)
     {
-        return isBound(queue);
+        String bindingKey = (routingKey == null) ? "" : routingKey.toString();
+        CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
+        
+        if(bindings != null)
+        {
+            for(Binding binding : bindings)
+            {
+                if(binding.getQueue().equals(queue))
+                {
+                    return true;
+                }
+            }
+        }
+        
+        return false;
     }
 
     public boolean isBound(AMQShortString routingKey)
     {
-        return hasBindings();
+        String bindingKey = (routingKey == null) ? "" : routingKey.toString();
+        CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
+        return bindings != null && !bindings.isEmpty();
     }
 
     public boolean isBound(AMQQueue queue)
     {
-        for (Registration r : _bindings)
+        for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values())
         {
-            if (r.queue.equals(queue))
+            for(Binding binding : bindings)
             {
-                return true;
+                if(binding.getQueue().equals(queue))
+                {
+                    return true;
+                }
             }
         }
+        
         return false;
     }
 
     public boolean hasBindings()
     {
-        return !_bindings.isEmpty();
-    }
-
-
-
-    protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame)
-    {
-        //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
-        //but these are not yet implemented.
-        return ((BasicContentHeaderProperties) contentHeaderFrame.properties).getHeaders();
+        return !getBindings().isEmpty();
     }
 
     protected AbstractExchangeMBean createMBean() throws JMException
@@ -210,59 +208,51 @@ public class HeadersExchange extends Abs
         return _logger;
     }
 
-
-    static class Registration
+    protected void onBind(final Binding binding)
     {
-        private final HeadersBinding binding;
-        private final AMQQueue queue;
-        private final AMQShortString routingKey;
+        String bindingKey = binding.getBindingKey();
+        AMQQueue queue = binding.getQueue();
+        AMQShortString routingKey = AMQShortString.valueOf(bindingKey);
+        Map<String,Object> args = binding.getArguments();
 
-        Registration(HeadersBinding binding, AMQQueue queue, AMQShortString routingKey)
-        {
-            this.binding = binding;
-            this.queue = queue;
-            this.routingKey = routingKey;
-        }
-
-        public int hashCode()
-        {
-            int queueHash = queue.hashCode();
-            int routingHash = routingKey == null ? 0 : routingKey.hashCode();
-            return queueHash + routingHash;
-        }
+        assert queue != null;
+        assert routingKey != null;
 
-        public boolean equals(Object o)
-        {
-            return o instanceof Registration
-                   && ((Registration) o).queue.equals(queue)
-                   && (routingKey == null ? ((Registration)o).routingKey == null
-                                          : routingKey.equals(((Registration)o).routingKey));
-        }
+        CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
 
-        public HeadersBinding getBinding()
+        if(bindings == null)
         {
-            return binding;
+            bindings = new CopyOnWriteArraySet<Binding>();
+            CopyOnWriteArraySet<Binding> newBindings;
+            if((newBindings = _bindingsByKey.putIfAbsent(bindingKey, bindings)) != null)
+            {
+                bindings = newBindings;
+            }
         }
-
-        public AMQQueue getQueue()
+        
+        if(_logger.isDebugEnabled())
         {
-            return queue;
+            _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() +
+                          " with binding key '" +bindingKey + "' and args: " + args);
         }
 
-        public AMQShortString getRoutingKey()
-        {
-            return routingKey;
-        }
-    }
+        _bindingHeaderMatchers.add(new HeadersBinding(binding));
+        bindings.add(binding);
 
-    protected void onBind(final Binding binding)
-    {
-        registerQueue(binding.getBindingKey(), binding.getQueue(), binding.getArguments());
     }
 
     protected void onUnbind(final Binding binding)
     {
-        deregisterQueue(binding.getBindingKey(), binding.getQueue(), binding.getArguments());
+        assert binding != null;
+
+        CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(binding.getBindingKey());
+        if(bindings != null)
+        {
+            bindings.remove(binding);
+        }
+        
+        _logger.debug("===============");
+        _logger.debug("Removing Binding: " + _bindingHeaderMatchers.remove(new HeadersBinding(binding)));
     }
 
 }

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Tue Mar 23 18:00:49 2010
@@ -31,6 +31,7 @@ import org.apache.qpid.framing.FieldTabl
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.exchange.topic.*;
 import org.apache.qpid.server.filter.JMSSelectorFilter;
 import org.apache.qpid.server.message.InboundMessage;
@@ -83,7 +84,7 @@ public class TopicExchange extends Abstr
     private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
             new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
 
-    private final Map<TopicBinding, FieldTable> _bindings = new HashMap<TopicBinding, FieldTable>();
+    private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
 
     private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>();
 
@@ -92,20 +93,12 @@ public class TopicExchange extends Abstr
         super(TYPE);
     }
 
-    public synchronized void registerQueue(String rKey, AMQQueue queue, Map<String,Object> args)
-    {
-        try
-        {
-            registerQueue(new AMQShortString(rKey), queue, FieldTable.convertToFieldTable(args));
-        }
-        catch (AMQInvalidArgumentException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQInvalidArgumentException
+    protected synchronized void registerQueue(final Binding binding) throws AMQInvalidArgumentException
     {
+        AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ;
+        AMQQueue queue = binding.getQueue();
+        FieldTable args = FieldTable.convertToFieldTable(binding.getArguments());
+        
         assert queue != null;
         assert rKey != null;
 
@@ -114,8 +107,6 @@ public class TopicExchange extends Abstr
 
         AMQShortString routingKey = TopicNormalizer.normalize(rKey);
 
-        TopicBinding binding = new TopicBinding(rKey, queue, args);
-
         if(_bindings.containsKey(binding))
         {
             FieldTable oldArgs = _bindings.get(binding);
@@ -146,6 +137,8 @@ public class TopicExchange extends Abstr
                     return;
                 }
             }
+            
+            result.addBinding(binding);
 
         }
         else
@@ -177,6 +170,8 @@ public class TopicExchange extends Abstr
                     result.addUnfilteredQueue(queue);
                 }
             }
+            
+            result.addBinding(binding);
             _bindings.put(binding, args);
         }
 
@@ -226,7 +221,8 @@ public class TopicExchange extends Abstr
 
     public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
     {
-        TopicBinding binding = new TopicBinding(routingKey, queue, arguments);
+        Binding binding = new Binding(null, routingKey.toString(), queue, this, FieldTable.convertToMap(arguments));
+        
         if (arguments == null)
         {
             return _bindings.containsKey(binding);
@@ -253,7 +249,7 @@ public class TopicExchange extends Abstr
 
     public boolean isBound(AMQShortString routingKey)
     {
-        for(TopicBinding b : _bindings.keySet())
+        for(Binding b : _bindings.keySet())
         {
             if(b.getBindingKey().equals(routingKey))
             {
@@ -266,7 +262,7 @@ public class TopicExchange extends Abstr
 
     public boolean isBound(AMQQueue queue)
     {
-        for(TopicBinding b : _bindings.keySet())
+        for(Binding b : _bindings.keySet())
         {
             if(b.getQueue().equals(queue))
             {
@@ -282,19 +278,16 @@ public class TopicExchange extends Abstr
         return !_bindings.isEmpty();
     }
 
-
-    public void deregisterQueue(String rKey, AMQQueue queue, Map<String, Object> args)
-    {
-        removeBinding(new TopicBinding(new AMQShortString(rKey), queue, FieldTable.convertToFieldTable(args)));
-    }
-
-    private boolean removeBinding(final TopicBinding binding)
+    private boolean deregisterQueue(final Binding binding)
     {
         if(_bindings.containsKey(binding))
         {
             FieldTable bindingArgs = _bindings.remove(binding);
-            AMQShortString bindingKey = TopicNormalizer.normalize(binding.getBindingKey());
+            AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
             TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
+            
+            result.removeBinding(binding);
+            
             if(argumentsContainSelector(bindingArgs))
             {
                 try
@@ -341,8 +334,14 @@ public class TopicExchange extends Abstr
             Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>();
             for(TopicMatcherResult result : results)
             {
+                TopicExchangeResult res = (TopicExchangeResult)result;
 
-                queues = ((TopicExchangeResult)result).processMessage(message, queues);
+                for(Binding b : res.getBindings())
+                {
+                    b.incrementMatches();
+                }
+                
+                queues = res.processMessage(message, queues);
             }
             return queues;
         }
@@ -350,14 +349,21 @@ public class TopicExchange extends Abstr
 
     }
 
-    protected void onBind(final org.apache.qpid.server.binding.Binding binding)
+    protected void onBind(final Binding binding)
     {
-        registerQueue(binding.getBindingKey(),binding.getQueue(),binding.getArguments());
+        try
+        {
+            registerQueue(binding);
+        }
+        catch (AMQInvalidArgumentException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
-    protected void onUnbind(final org.apache.qpid.server.binding.Binding binding)
+    protected void onUnbind(final Binding binding)
     {
-        deregisterQueue(binding.getBindingKey(),binding.getQueue(),binding.getArguments());
+        deregisterQueue(binding);
     }
 
 }

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java Tue Mar 23 18:00:49 2010
@@ -21,14 +21,22 @@
 package org.apache.qpid.server.exchange.topic;
 
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.InboundMessage;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 public final class TopicExchangeResult implements TopicMatcherResult
 {
+    private final List<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
     private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
     private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
 
@@ -64,6 +72,20 @@ public final class TopicExchangeResult i
         return _unfilteredQueues.keySet();
     }
 
+    public void addBinding(Binding binding)
+    {
+        _bindings.add(binding);
+    }
+    
+    public void removeBinding(Binding binding)
+    {
+        _bindings.remove(binding);
+    }
+    
+    public List<Binding> getBindings()
+    {
+        return new ArrayList<Binding>(_bindings);
+    }
 
     public void addFilteredQueue(AMQQueue queue, MessageFilter filter)
     {

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -3,4 +3,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/management:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/management:787599
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,919043-926606

Propchange: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -3,4 +3,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,919043-926606

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Mar 23 18:00:49 2010
@@ -111,6 +111,7 @@ public interface AMQQueue extends Managa
 
     void dequeue(QueueEntry entry, Subscription sub);
 
+    void decrementUnackedMsgCount();
 
 
     boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;

Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Mar 23 18:00:49 2010
@@ -226,22 +226,29 @@ public class QueueEntryImpl implements Q
 
     public void release()
     {
-        _stateUpdater.set(this,AVAILABLE_STATE);
-        if(!getQueue().isDeleted())
+        EntryState state = _state;
+        
+        if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
         {
-            getQueue().requeue(this);
-            if(_stateChangeListeners != null)
+            if(state instanceof SubscriptionAcquiredState)
             {
-                notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+                getQueue().decrementUnackedMsgCount();
             }
+            
+            if(!getQueue().isDeleted())
+            {
+                getQueue().requeue(this);
+                if(_stateChangeListeners != null)
+                {
+                    notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+                }
 
+            }
+            else if(acquire())
+            {
+                routeToAlternate();
+            }
         }
-        else if(acquire())
-        {
-            routeToAlternate();
-        }
-
-
     }
 
     public boolean releaseButRetain()
@@ -369,6 +376,7 @@ public class QueueEntryImpl implements Q
             Subscription s = null;
             if (state instanceof SubscriptionAcquiredState)
             {
+                getQueue().decrementUnackedMsgCount();
                 s = ((SubscriptionAcquiredState) state).getSubscription();
                 s.onDequeue(this);
             }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org