You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/23 19:00:56 UTC
svn commit: r926686 [4/6] - in /qpid/branches/qmf-devel0.7a/qpid: ./ cpp/
cpp/docs/api/ cpp/docs/src/ cpp/examples/ cpp/examples/messaging/
cpp/examples/pub-sub/ cpp/include/qmf/engine/ cpp/include/qpid/agent/
cpp/include/qpid/client/amqp0_10/ cpp/incl...
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py Tue Mar 23 18:00:49 2010
@@ -29,9 +29,19 @@ from itertools import chain
log = getLogger("qpid.cluster_tests")
+# Note: brokers that shut themselves down due to critical error during
+# normal operation will still have an exit code of 0. Brokers that
+# shut down because of an error found during initialize will exit with
+# a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK
+# and EXPECT_EXIT_FAIL in some of the tests below.
+
+# FIXME aconway 2010-03-11: resolve this - ideally any exit due to an error
+# should give non-0 exit status.
+
# Import scripts as modules
qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
+
def readfile(filename):
"""Returns te content of file named filename as a string"""
f = file(filename)
@@ -144,7 +154,7 @@ class LongTests(BrokerTest):
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
ErrorGenerator(b)
- time.sleep(1)
+ time.sleep(min(5,self.duration()/2))
sender.stop()
receiver.stop(sender.sent)
for i in range(i, len(cluster)): cluster[i].kill()
@@ -152,7 +162,7 @@ class LongTests(BrokerTest):
def test_management(self):
"""Run management clients and other clients concurrently."""
- # FIXME aconway 2010-03-03: move to framework
+ # TODO aconway 2010-03-03: move to brokertest framework
class ClientLoop(StoppableThread):
"""Run a client executable in a loop."""
def __init__(self, broker, cmd):
@@ -173,14 +183,21 @@ class LongTests(BrokerTest):
self.cmd, expect=EXPECT_UNKNOWN)
finally: self.lock.release()
try: exit = self.process.wait()
- except: exit = 1
+ except OSError, e:
+ # Seems to be a race in wait(), it throws
+ # "no such process" during test shutdown.
+ # Doesn't indicate a test error, ignore.
+ return
+ except Exception, e:
+ self.process.unexpected(
+ "client of %s: %s"%(self.broker.name, e))
self.lock.acquire()
try:
# Quit and ignore errors if stopped or expecting failure.
if self.stopped: break
if exit != 0:
- self.process.unexpected("client of %s exit status %s" %
- (self.broker.name, exit))
+ self.process.unexpected(
+ "client of %s exit code %s"%(self.broker.name, exit))
finally: self.lock.release()
except Exception, e:
self.error = RethrownException("Error in ClientLoop.run")
@@ -218,9 +235,7 @@ class LongTests(BrokerTest):
["perftest", "--count", 1000,
"--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
- [os.path.join(self.rootdir, "testagent/testagent"), "localhost",
- str(broker.port())]
- ]:
+ ["testagent", "localhost", str(broker.port())] ]:
batch.append(ClientLoop(broker, cmd))
clients.append(batch)
@@ -238,7 +253,7 @@ class LongTests(BrokerTest):
start_mclients(b)
while time.time() < endtime:
- time.sleep(min(5,self.duration()))
+ time.sleep(min(5,self.duration()/2))
for b in cluster[alive:]: b.ready() # Check if a broker crashed.
# Kill the first broker. Ignore errors on its clients and all the mclients
for c in clients[alive] + mclients: c.expect_fail()
@@ -252,7 +267,6 @@ class LongTests(BrokerTest):
b = cluster.start()
start_clients(b)
for b in cluster[alive:]: start_mclients(b)
-
for c in chain(mclients, *clients):
c.stop()
@@ -283,6 +297,11 @@ class StoreTests(BrokerTest):
m = cluster.start("restartme").get_message("q")
self.assertEqual("x", m.content)
+ def stop_cluster(self,broker):
+ """Clean shut-down of a cluster"""
+ self.assertEqual(0, qpid_cluster.main(
+ ["qpid-cluster", "-kf", broker.host_port()]))
+
def test_persistent_restart(self):
"""Verify persistent cluster shutdown/restart scenarios"""
cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
@@ -298,7 +317,7 @@ class StoreTests(BrokerTest):
self.assertEqual(c.get_message("q").content, "2")
# Shut down the entire cluster cleanly and bring it back up
a.send_message("q", Message("3", durable=True))
- self.assertEqual(0, qpid_cluster.main(["qpid-cluster", "-kf", a.host_port()]))
+ self.stop_cluster(a)
a = cluster.start("a", wait=False)
b = cluster.start("b", wait=False)
c = cluster.start("c", wait=True)
@@ -316,7 +335,7 @@ class StoreTests(BrokerTest):
b.kill()
self.assertEqual(c.get_message("q").content, "4")
c.send_message("q", Message("clean", durable=True))
- self.assertEqual(0, qpid_cluster.main(["qpid-cluster", "-kf", c.host_port()]))
+ self.stop_cluster(c)
a = cluster.start("a", wait=False)
b = cluster.start("b", wait=False)
c = cluster.start("c", wait=True)
@@ -329,7 +348,7 @@ class StoreTests(BrokerTest):
a.terminate()
cluster2 = self.cluster(1, args=self.args())
try:
- a = cluster2.start("a", expect=EXPECT_EXIT_OK)
+ a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
a.ready()
self.fail("Expected exception")
except: pass
@@ -339,27 +358,29 @@ class StoreTests(BrokerTest):
cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
- self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+ self.stop_cluster(a)
self.assertEqual(a.wait(), 0)
self.assertEqual(b.wait(), 0)
# Restart with a different member and shut down.
a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False)
- self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+ self.stop_cluster(a)
self.assertEqual(a.wait(), 0)
self.assertEqual(c.wait(), 0)
-
# Mix members from both shutdown events, they should fail
- a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+ # FIXME aconway 2010-03-11: can't predict the exit status of these
+ # as it depends on the order of delivery of initial-status messages.
+ # See comment at top of this file.
+ a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False)
+ b = cluster.start("b", expect=EXPECT_UNKNOWN, wait=False)
self.assertRaises(Exception, lambda: a.ready())
self.assertRaises(Exception, lambda: b.ready())
def assert_dirty_store(self, broker):
- self.assertRaises(Exception, lambda: broker.ready())
+ assert retry(lambda: os.path.exists(broker.log)), "Missing log file %s"%broker.log
msg = re.compile("critical.*no clean store")
- assert msg.search(readfile(broker.log))
+ assert retry(lambda: msg.search(readfile(broker.log))), "Expected dirty store message in %s"%broker.log
def test_solo_store_clean(self):
# A single node cluster should always leave a clean store.
@@ -371,7 +392,6 @@ class StoreTests(BrokerTest):
self.assertEqual(a.get_message("q").content, "x")
def test_last_store_clean(self):
-
# Verify that only the last node in a cluster to shut down has
# a clean store. Start with cluster of 3, reduce to 1 then
# increase again to ensure that a node that was once alone but
@@ -390,13 +410,41 @@ class StoreTests(BrokerTest):
time.sleep(0.1) # pause for a to find out hes last.
a.kill() # really last
# b & c should be dirty
- b = cluster.start("b", wait=False, expect=EXPECT_EXIT_OK)
+ b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
self.assert_dirty_store(b)
- c = cluster.start("c", wait=False, expect=EXPECT_EXIT_OK)
+ c = cluster.start("c", wait=False, expect=EXPECT_EXIT_FAIL)
self.assert_dirty_store(c)
# a should be clean
a = cluster.start("a")
self.assertEqual(a.get_message("q").content, "x")
-
+ def test_restart_clean(self):
+ """Verify that we can re-start brokers one by one in a
+ persistent cluster after a clean oshutdown"""
+ cluster = self.cluster(0, self.args())
+ a = cluster.start("a", expect=EXPECT_EXIT_OK)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK)
+ c = cluster.start("c", expect=EXPECT_EXIT_OK)
+ a.send_message("q", Message("x", durable=True))
+ self.stop_cluster(a)
+ a = cluster.start("a")
+ b = cluster.start("b")
+ c = cluster.start("c")
+ self.assertEqual(c.get_message("q").content, "x")
+
+ def test_join_sub_size(self):
+ """Verify that after starting a cluster with cluster-size=N,
+ we can join new members even if size < N-1"""
+ cluster = self.cluster(0, self.args())
+ a = cluster.start("a", wait=False, expect=EXPECT_EXIT_FAIL)
+ b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
+ c = cluster.start("c")
+ a.send_message("q", Message("x", durable=True))
+ a.send_message("q", Message("y", durable=True))
+ a.kill()
+ b.kill()
+ a = cluster.start("a")
+ self.assertEqual(c.get_message("q").content, "x")
+ b = cluster.start("b")
+ self.assertEqual(c.get_message("q").content, "y")
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py:919043-926606
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_recv.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_recv.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_recv.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_recv.cpp Tue Mar 23 18:00:49 2010
@@ -27,12 +27,14 @@
#include <qpid/Options.h>
#include <qpid/log/Logger.h>
#include <qpid/log/Options.h>
+#include <qpid/client/amqp0_10/FailoverUpdates.h>
#include "TestOptions.h"
#include <iostream>
-
+#include <memory>
using namespace qpid::messaging;
+using qpid::client::amqp0_10::FailoverUpdates;
using namespace std;
@@ -54,6 +56,7 @@ struct Options : public qpid::Options
uint tx;
uint rollbackFrequency;
bool printHeaders;
+ bool failoverUpdates;
qpid::log::Options log;
Options(const std::string& argv0=std::string())
@@ -69,6 +72,7 @@ struct Options : public qpid::Options
tx(0),
rollbackFrequency(0),
printHeaders(false),
+ failoverUpdates(false),
log(argv0)
{
addOptions()
@@ -84,6 +88,7 @@ struct Options : public qpid::Options
("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content")
+ ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -143,9 +148,10 @@ int main(int argc, char ** argv)
{
Options opts;
if (opts.parse(argc, argv)) {
+ Connection connection(opts.connectionOptions);
try {
- Connection connection(opts.connectionOptions);
connection.open(opts.url);
+ std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = connection.newSession(opts.tx > 0);
Receiver receiver = session.createReceiver(opts.address);
receiver.setCapacity(opts.capacity);
@@ -201,6 +207,7 @@ int main(int argc, char ** argv)
return 0;
} catch(const std::exception& error) {
std::cerr << "Failure: " << error.what() << std::endl;
+ connection.close();
}
}
return 1;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_send.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_send.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_send.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_send.cpp Tue Mar 23 18:00:49 2010
@@ -25,16 +25,15 @@
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
+#include <qpid/client/amqp0_10/FailoverUpdates.h>
#include "TestOptions.h"
#include <fstream>
#include <iostream>
+#include <memory>
using namespace qpid::messaging;
-using qpid::framing::Uuid;
-using qpid::sys::AbsTime;
-using qpid::sys::now;
-using qpid::sys::TIME_INFINITE;
+using qpid::client::amqp0_10::FailoverUpdates;
typedef std::vector<std::string> string_vector;
@@ -49,7 +48,6 @@ struct Options : public qpid::Options
std::string url;
std::string connectionOptions;
std::string address;
- int64_t timeout;
uint count;
std::string id;
std::string replyto;
@@ -64,13 +62,13 @@ struct Options : public qpid::Options
uint tx;
uint rollbackFrequency;
uint capacity;
+ bool failoverUpdates;
qpid::log::Options log;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
help(false),
url("amqp:tcp:127.0.0.1"),
- timeout(TIME_INFINITE),
count(1),
sendEos(0),
durable(false),
@@ -78,13 +76,13 @@ struct Options : public qpid::Options
tx(0),
rollbackFrequency(0),
capacity(0),
+ failoverUpdates(false),
log(argv0)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
- ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "exit after the specified time")
("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables")
("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
@@ -99,6 +97,7 @@ struct Options : public qpid::Options
("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue")
("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
+ ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -182,9 +181,10 @@ int main(int argc, char ** argv)
{
Options opts;
if (opts.parse(argc, argv)) {
+ Connection connection(opts.connectionOptions);
try {
- Connection connection(opts.connectionOptions);
connection.open(opts.url);
+ std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = connection.newSession(opts.tx > 0);
Sender sender = session.createSender(opts.address);
if (opts.capacity) sender.setCapacity(opts.capacity);
@@ -230,6 +230,7 @@ int main(int argc, char ** argv)
return 0;
} catch(const std::exception& error) {
std::cout << "Failed: " << error.what() << std::endl;
+ connection.close();
}
}
return 1;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_stream.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_stream.cpp?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_stream.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/qpid_stream.cpp Tue Mar 23 18:00:49 2010
@@ -40,16 +40,33 @@ struct Args : public qpid::Options
{
std::string url;
std::string address;
+ uint size;
uint rate;
bool durable;
-
- Args() : url("amqp:tcp:127.0.0.1:5672"), address("test-queue"), rate(1000), durable(false)
+ uint receiverCapacity;
+ uint senderCapacity;
+ uint ackFrequency;
+
+ Args() :
+ url("amqp:tcp:127.0.0.1:5672"),
+ address("test-queue"),
+ size(512),
+ rate(1000),
+ durable(false),
+ receiverCapacity(0),
+ senderCapacity(0),
+ ackFrequency(1)
{
addOptions()
("url", qpid::optValue(url, "URL"), "Url to connect to.")
("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.")
+ ("size", qpid::optValue(size, "bytes"), "Message size in bytes (content only, not headers).")
("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.")
- ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.");
+ ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.")
+ ("sender-capacity", qpid::optValue(senderCapacity, "N"), "Credit window (0 implies infinite window)")
+ ("receiver-capacity", qpid::optValue(receiverCapacity, "N"), "Credit window (0 implies infinite window)")
+ ("ack-frequency", qpid::optValue(ackFrequency, "N"),
+ "Ack frequency (0 implies none of the messages will get accepted)");
}
};
@@ -70,8 +87,8 @@ struct Client : qpid::sys::Runnable
void run()
{
+ Connection connection;
try {
- Connection connection;
connection.open(opts.url);
Session session = connection.newSession();
doWork(session);
@@ -79,6 +96,7 @@ struct Client : qpid::sys::Runnable
connection.close();
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
+ connection.close();
}
}
@@ -93,7 +111,8 @@ struct Publish : Client
void doWork(Session& session)
{
Sender sender = session.createSender(opts.address);
- Message msg;
+ if (opts.senderCapacity) sender.setCapacity(opts.senderCapacity);
+ Message msg(std::string(opts.size, 'X'));
uint64_t interval = qpid::sys::TIME_SEC / opts.rate;
uint64_t sent = 0, missedRate = 0;
qpid::sys::AbsTime start = qpid::sys::now();
@@ -123,9 +142,12 @@ struct Consume : Client
double maxLatency = 0;
double totalLatency = 0;
Receiver receiver = session.createReceiver(opts.address);
+ if (opts.receiverCapacity) receiver.setCapacity(opts.receiverCapacity);
while (receiver.fetch(msg)) {
- session.acknowledge();//TODO: add batching option
++received;
+ if (opts.ackFrequency && (received % opts.ackFrequency == 0)) {
+ session.acknowledge();
+ }
//calculate latency
uint64_t receivedAt = timestamp(qpid::sys::now());
uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64();
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/quick_topictest.ps1
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/quick_topictest.ps1?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/quick_topictest.ps1 (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/quick_topictest.ps1 Tue Mar 23 18:00:49 2010
@@ -20,11 +20,11 @@
# Quick and quiet topic test for make check.
[string]$me = $myInvocation.InvocationName
$srcdir = Split-Path $me
-& "$srcdir\topictest.ps1" -subscribers 2 -messages 2 -batches 1 > topictest.log 2>&1
+Invoke-Expression "$srcdir\topictest.ps1 -subscribers 2 -messages 2 -batches 1" > topictest.log 2>&1
if (!$?) {
"$me FAILED:"
cat topictest.log
- exit $LastExitCode
+ exit 1
}
Remove-Item topictest.log
exit 0
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/topictest.ps1
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/topictest.ps1?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/topictest.ps1 (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/topictest.ps1 Tue Mar 23 18:00:49 2010
@@ -17,19 +17,21 @@
# under the License.
#
-# Run the C++ topic test
-$srcdir = Split-Path $myInvocation.InvocationName
-
# Parameters with default values: s (subscribers) m (messages) b (batches)
# h (host) t (false; use transactions)
param (
[int]$subscribers = 10,
- [int]$messages = 2000,
+ [int]$message_count = 2000,
[int]$batches = 10,
[string]$broker,
[switch] $t # transactional
)
+# Run the C++ topic test
+[string]$me = $myInvocation.InvocationName
+$srcdir = Split-Path $me
+#$srcdir = Split-Path $myInvocation.InvocationName
+
# Clean up old log files
Get-Item subscriber_*.log | Remove-Item
@@ -41,7 +43,7 @@ if ($t) {
. $srcdir\find_prog.ps1 .\topic_listener.exe
function subscribe {
- param ([int]$num)
+ param ([int]$num, [string]$sub)
"Start subscriber $num"
$LOG = "subscriber_$num.log"
$cmdline = ".\$sub\topic_listener $transactional > $LOG 2>&1
@@ -51,7 +53,8 @@ function subscribe {
}
function publish {
- Invoke-Expression ".\$sub\topic_publisher --messages $messages --batches $batches --subscribers $subscribers $host $transactional" 2>&1
+ param ([string]$sub)
+ Invoke-Expression ".\$sub\topic_publisher --messages $message_count --batches $batches --subscribers $subscribers $host $transactional" 2>&1
}
if ($broker.length) {
@@ -60,11 +63,11 @@ if ($broker.length) {
$i = $subscribers
while ($i -gt 0) {
- subscribe $i
+ subscribe $i $sub
$i--
}
# FIXME aconway 2007-03-27: Hack around startup race. Fix topic test.
Start-Sleep 2
-publish
+publish $sub
exit $LastExitCode
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/xml/cluster.xml?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/xml/cluster.xml (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/xml/cluster.xml Tue Mar 23 18:00:49 2010
@@ -64,7 +64,6 @@
<field name="cluster-id" type="uuid"/>>
<field name="store-state" type="store-state"/>
<field name="shutdown-id" type="uuid"/>
- <field name="config-seq" type="sequence-no"/>
<field name="first-config" type="str16"/>
</control>
@@ -122,6 +121,10 @@
encryption (e.g. ssl), ssf is the bit length of the key. Zero if no
encryption provided. -->
<field name="ssf" type="uint32"/>
+ <!-- external auth id (e.g. ssl client certificate id) -->
+ <field name="authid" type="str16"/>
+ <!-- exclude certain sasl mechs, used with ssl and sasl-external -->
+ <field name="nodict" type="bit"/>
</control>
<!-- Marks the cluster-wide point when a connection is considered closed. -->
Modified: qpid/branches/qmf-devel0.7a/qpid/doc/book/src/SASL-Compatibility.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/doc/book/src/SASL-Compatibility.xml?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/doc/book/src/SASL-Compatibility.xml (original)
+++ qpid/branches/qmf-devel0.7a/qpid/doc/book/src/SASL-Compatibility.xml Tue Mar 23 18:00:49 2010
@@ -12,18 +12,18 @@ This table list the various SASL mechani
functionality was added to the product.
|| Component || ANONYMOUS || CRAM-MD5 || DIGEST-MD5 || EXTERNAL || GSSAPI/Kerberos || PLAIN ||
-| C++ Broker |M3\[[#1]\] |M3\[[#1],[#2]\]| | |M3\[[#1],[#2]\] | M1 |
-| C++ Client |M3\[[#1]\] | | | | | M1 |
+| C++ Broker |M3 |M3 | M3 |0.8\[[#1]\]|M3 | M1 |
+| C++ Client |M3 |0.5 | 0.5 |0.8\[[#1]\]| | M1 |
| Java Broker | | M1 | | | | M1 |
-| Java Client | | M1 | | | | M1 |
+| Java Client | | M1 | | M1 | | M1 |
| .Net Client | M2 | M2 | M2 | M2 | | M2 |
-| Python Client | | | | | | ? |
-| Ruby Client | | | | | | ? |
+| Python Client |0.6\[[#2]\]|0.6\[[#2]\]|0.6\[[#2]\] |0.6\[[#2]\]|0.6\[[#2]\] | M4 |
+| Ruby Client |0.6\[[#2]\]|0.6\[[#2]\]|0.6\[[#2]\] |0.6\[[#2]\]|0.6\[[#2]\] | M4 |
{anchor:1}
-1: Support for these will be in M3 (currently available on trunk).
+1: Only enabled for client authenticated SSL connections.
{anchor:2}
-2: C++ Broker uses [Cyrus Sasl|http://freshmeat.net/projects/cyrussasl/] which supports CRAM-MD5 and GSSAPI but these have not been tested yet
+2: On linux only via cyrus-sasl integration.
h4. Custom Mechanisms
Modified: qpid/branches/qmf-devel0.7a/qpid/doc/book/src/schemas.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/doc/book/src/schemas.xml?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/doc/book/src/schemas.xml (original)
+++ qpid/branches/qmf-devel0.7a/qpid/doc/book/src/schemas.xml Tue Mar 23 18:00:49 2010
@@ -21,37 +21,73 @@
-->
<locatingRules xmlns="http://thaiopensource.com/ns/locating-rules/1.0">
- <uri resource="Download.xml" typeId="DocBook"/>
- <uri resource="Getting-Started.xml" typeId="DocBook"/>
- <uri resource="WCF.xml" typeId="DocBook"/>
- <uri resource="Book-Info.xml" typeId="DocBook"/>
- <uri resource="Book-Info.xml" typeId="DocBook"/>
- <uri resource="Book.xml" uri="../../../../../../../usr/share/xml/docbook5/schema/rng/5.0/docbookxi.rnc"/>
- <uri resource="queue%20state%20replication.xml" typeId="DocBook"/>
- <uri resource="queue state replication.xml" typeId="DocBook"/>
- <uri resource="AMQP Ruby Messaging Client.xml" typeId="DocBook"/>
- <uri resource="AMQP Compatibility.xml" typeId="DocBook"/>
- <uri resource="Qpid Troubleshooting Guide.xml" typeId="DocBook"/>
- <uri resource="Book_Info.xml" typeId="DocBook"/>
- <uri resource="Book_Info.xml" typeId="DocBook"/>
- <uri resource="AMQP-Messaging-Broker-CPP.xml" typeId="DocBook"/>
- <uri resource="Configure the Broker via config.xml.xml" typeId="DocBook"/>
- <uri resource="SSL.xml" typeId="DocBook"/>
- <uri resource="Using Broker Federation.xml" typeId="DocBook"/>
- <uri resource="Cheat Sheet for configuring Exchange Options.xml" typeId="DocBook"/>
- <uri resource="foo.xml" typeId="DocBook"/>
- <uri resource="Download.xml" typeId="DocBook"/>
- <uri resource="Download.xml" typeId="DocBook"/>
- <uri resource="Starting a cluster.xml" typeId="DocBook"/>
- <uri resource="queue state replication.xml" typeId="DocBook"/>
- <uri resource="LVQ.xml" typeId="DocBook"/>
- <uri resource="LVQ.xml" typeId="DocBook"/>
- <uri resource="Cheat Sheet for configuring Queue Options.xml" typeId="DocBook"/>
- <uri resource="Cheat Sheet for configuring Exchange Options.xml" typeId="DocBook"/>
- <uri resource="Cheat Sheet for configuring Exchange Options.xml" typeId="DocBook"/>
- <uri resource="AMQP-Messaging-Broker-CPP.xml" typeId="DocBook"/>
- <uri resource="RASC.xml" typeId="DocBook"/>
- <uri resource="Brokers.xml" typeId="DocBook"/>
- <uri resource="AMQP.xml" typeId="DocBook"/>
- <uri resource="qpid-book.xml" typeId="DocBook"/>
+ <uri resource="ACL.xml" typeId="DocBook"/>
+ <uri resource="Add-New-Users.xml" typeId="DocBook"/>
+ <uri resource="AMQP-C++-Messaging-Client.xml" typeId="DocBook"/>
+ <uri resource="AMQP-Compatibility.xml" typeId="DocBook"/>
+ <uri resource="AMQP-Java-JMS-Messaging-Client.xml" typeId="DocBook"/>
+ <uri resource="AMQP-Messaging-Broker-CPP.xml" typeId="DocBook"/>
+ <uri resource="AMQP-Messaging-Broker-Java.xml" typeId="DocBook"/>
+ <uri resource="AMQP-.NET-Messaging-Client.xml" typeId="DocBook"/>
+ <uri resource="AMQP-Python-Messaging-Client.xml" typeId="DocBook"/>
+ <uri resource="AMQP-Ruby-Messaging-Client.xml" typeId="DocBook"/>
+ <uri resource="AMQP.xml" typeId="DocBook"/>
+ <uri resource="Binding-URL-Format.xml" typeId="DocBook"/>
+ <uri resource="Book-Info.xml" typeId="DocBook"/>
+ <uri resource="Book.xml" typeId="DocBook"/>
+ <uri resource="Broker-CPP.xml" typeId="DocBook"/>
+ <uri resource="Broker-Java.xml" typeId="DocBook"/>
+ <uri resource="Cheat-Sheet-for-configuring-Exchange-Options.xml" typeId="DocBook"/>
+ <uri resource="Cheat-Sheet-for-configuring-Queue-Options.xml" typeId="DocBook"/>
+ <uri resource="Clients.xml" typeId="DocBook"/>
+ <uri resource="Configure-ACLs.xml" typeId="DocBook"/>
+ <uri resource="Configure-Java-Qpid-to-use-a-SSL-connection.xml" typeId="DocBook"/>
+ <uri resource="Configure-Log4j-CompositeRolling-Appender.xml" typeId="DocBook"/>
+ <uri resource="Configure-the-Broker-via-config.xml.xml" typeId="DocBook"/>
+ <uri resource="Configure-the-Virtual-Hosts-via-virtualhosts.xml.xml" typeId="DocBook"/>
+ <uri resource="Configuring-Management-Users.xml" typeId="DocBook"/>
+ <uri resource="Configuring-Qpid-JMX-Management-Console.xml" typeId="DocBook"/>
+ <uri resource="Connection-URL-Format.xml" typeId="DocBook"/>
+ <uri resource="Debug-using-log4j.xml" typeId="DocBook"/>
+ <uri resource="Download.xml" typeId="DocBook"/>
+ <uri resource="Excel-AddIn.xml" typeId="DocBook"/>
+ <uri resource="FAQ.xml" typeId="DocBook"/>
+ <uri resource="foo.xml" typeId="DocBook"/>
+ <uri resource="f.xml" typeId="DocBook"/>
+ <uri resource="Getting-Started.xml" typeId="DocBook"/>
+ <uri resource="How-to-Tune-M3-Java-Broker-Performance.xml" typeId="DocBook"/>
+ <uri resource="How-to-Use-JNDI.xml" typeId="DocBook"/>
+ <uri resource="Introduction.xml" typeId="DocBook"/>
+ <uri resource="Java-Broker-Feature-Guide.xml" typeId="DocBook"/>
+ <uri resource="Java-Environment-Variables.xml" typeId="DocBook"/>
+ <uri resource="LVQ.xml" typeId="DocBook"/>
+ <uri resource="Management-Console-Security.xml" typeId="DocBook"/>
+ <uri resource="Management-Design-notes.xml" typeId="DocBook"/>
+ <uri resource="Managing-CPP-Broker.xml" typeId="DocBook"/>
+ <uri resource="MessageStore-Tool.xml" typeId="DocBook"/>
+ <uri resource="NET-User-Guide.xml" typeId="DocBook"/>
+ <uri resource="PythonBrokerTest.xml" typeId="DocBook"/>
+ <uri resource="QMan-Qpid-Management-bridge.xml" typeId="DocBook"/>
+ <uri resource="QMF-Python-Console-Tutorial.xml" typeId="DocBook"/>
+ <uri resource="Qpid-ACLs.xml" typeId="DocBook"/>
+ <uri resource="Qpid-Interoperability-Documentation.xml" typeId="DocBook"/>
+ <uri resource="Qpid-Java-Broker-Management-CLI.xml" typeId="DocBook"/>
+ <uri resource="Qpid-Java-Build-How-To.xml" typeId="DocBook"/>
+ <uri resource="Qpid-Java-FAQ.xml" typeId="DocBook"/>
+ <uri resource="Qpid-JMX-Management-Console-FAQ.xml" typeId="DocBook"/>
+ <uri resource="Qpid-JMX-Management-Console-User-Guide.xml" typeId="DocBook"/>
+ <uri resource="Qpid-JMX-Management-Console.xml" typeId="DocBook"/>
+ <uri resource="Qpid-Management-Features.xml" typeId="DocBook"/>
+ <uri resource="Qpid-Management-Framework.xml" typeId="DocBook"/>
+ <uri resource="Qpid-Troubleshooting-Guide.xml" typeId="DocBook"/>
+ <uri resource="queue-state-replication.xml" typeId="DocBook"/>
+ <uri resource="Running-CPP-Broker.xml" typeId="DocBook"/>
+ <uri resource="SASL-Compatibility.xml" typeId="DocBook"/>
+ <uri resource="SSL.xml" typeId="DocBook"/>
+ <uri resource="Starting-a-cluster.xml" typeId="DocBook"/>
+ <uri resource="System-Properties.xml" typeId="DocBook"/>
+ <uri resource="Use-Priority-Queues.xml" typeId="DocBook"/>
+ <uri resource="Using-Broker-Federation.xml" typeId="DocBook"/>
+ <uri resource="Using-Qpid-with-other-JNDI-Providers.xml" typeId="DocBook"/>
+ <uri resource="WCF.xml" typeId="DocBook"/>
</locatingRules>
Modified: qpid/branches/qmf-devel0.7a/qpid/dotnet/TestClient/TestClient.csproj
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/dotnet/TestClient/TestClient.csproj?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/dotnet/TestClient/TestClient.csproj (original)
+++ qpid/branches/qmf-devel0.7a/qpid/dotnet/TestClient/TestClient.csproj Tue Mar 23 18:00:49 2010
@@ -1,4 +1,4 @@
-<!--
+<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -19,7 +19,7 @@
-->
-<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="3.5">
+<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="3.5">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
Modified: qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicListener/TopicListener.csproj
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicListener/TopicListener.csproj?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicListener/TopicListener.csproj (original)
+++ qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicListener/TopicListener.csproj Tue Mar 23 18:00:49 2010
@@ -1,4 +1,4 @@
-<!--
+<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -19,7 +19,7 @@
-->
-<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="3.5">
+<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="3.5">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
Modified: qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicPublisher/TopicPublisher.csproj
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicPublisher/TopicPublisher.csproj?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicPublisher/TopicPublisher.csproj (original)
+++ qpid/branches/qmf-devel0.7a/qpid/dotnet/TopicPublisher/TopicPublisher.csproj Tue Mar 23 18:00:49 2010
@@ -1,4 +1,4 @@
-<!--
+<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -19,7 +19,7 @@
-->
-<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="3.5">
+<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="3.5">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
Propchange: qpid/branches/qmf-devel0.7a/qpid/dotnet/build-msbuild.bat
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/dotnet/build-msbuild.bat:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/dotnet/build-nant-release
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/dotnet/build-nant-release:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/dotnet/build-nant.bat
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -0,0 +1 @@
+/qpid/trunk/qpid/dotnet/build-nant.bat:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -3,3 +3,4 @@
/qpid/branches/java-broker-0-10/qpid/java:795950-829653
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/trunk/qpid:796646-796653
+/qpid/trunk/qpid/java:919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/broker/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/broker:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker:787599
-/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
+/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/broker/bin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -1,4 +1,4 @@
/qpid/branches/0.5-release/qpid/java/broker/bin:757268
/qpid/branches/java-broker-0-10/qpid/java/broker/bin:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker/bin:805429-821809
-/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
+/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,919043-926606
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java Tue Mar 23 18:00:49 2010
@@ -31,7 +31,6 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.exchange.ExchangeType;
-import org.apache.qpid.server.exchange.topic.TopicBinding;
import org.apache.qpid.server.exchange.topic.TopicExchangeResult;
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
import org.apache.qpid.server.exchange.topic.TopicNormalizer;
@@ -47,7 +46,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -55,6 +53,7 @@ import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
public class ManagementExchange implements Exchange, QMFService.Listener
@@ -69,8 +68,7 @@ public class ManagementExchange implemen
private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
- private final Map<TopicBinding, FieldTable> _topicBindings = new HashMap<TopicBinding, FieldTable>();
- private final Set<Binding> _bindingSet = new HashSet<Binding>();
+ private final Set<Binding> _bindingSet = new CopyOnWriteArraySet<Binding>();
private UUID _id;
private static final String AGENT_BANK = "0";
@@ -254,21 +252,7 @@ public class ManagementExchange implemen
public synchronized void addBinding(final Binding b)
{
- _bindingSet.add(b);
-
- for(BindingListener listener : _listeners)
- {
- listener.bindingAdded(this, b);
- }
-
- if(_bindingSet.size() > _bindingCountHigh)
- {
- _bindingCountHigh = _bindingSet.size();
- }
-
- TopicBinding binding = new TopicBinding(new AMQShortString(b.getBindingKey()), b.getQueue(), null);
-
- if(!_topicBindings.containsKey(binding))
+ if(_bindingSet.add(b))
{
AMQShortString routingKey = TopicNormalizer.normalize(new AMQShortString(b.getBindingKey()));
@@ -284,10 +268,20 @@ public class ManagementExchange implemen
{
result.addUnfilteredQueue(b.getQueue());
}
- _topicBindings.put(binding, null);
+ result.addBinding(b);
+ }
+
+ for(BindingListener listener : _listeners)
+ {
+ listener.bindingAdded(this, b);
+ }
+ if(_bindingSet.size() > _bindingCountHigh)
+ {
+ _bindingCountHigh = _bindingSet.size();
}
+
String bindingKey = b.getBindingKey();
if(bindingKey.startsWith("schema.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#."))
@@ -355,6 +349,13 @@ public class ManagementExchange implemen
HashSet<AMQQueue> queues = new HashSet<AMQQueue>();
for(TopicMatcherResult result : results)
{
+ TopicExchangeResult res = (TopicExchangeResult)result;
+
+ for(Binding b : res.getBindings())
+ {
+ b.incrementMatches();
+ }
+
queues.addAll(((TopicExchangeResult)result).getUnfilteredQueues());
}
for(AMQQueue queue : queues)
@@ -378,14 +379,11 @@ public class ManagementExchange implemen
public synchronized void removeBinding(final Binding binding)
{
- _bindingSet.remove(binding);
-
- TopicBinding topicBinding = new TopicBinding(new AMQShortString(binding.getBindingKey()), binding.getQueue(), null);
-
- if(_topicBindings.containsKey(topicBinding))
+ if(_bindingSet.remove(binding))
{
- AMQShortString bindingKey = TopicNormalizer.normalize(topicBinding.getBindingKey());
+ AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
+ result.removeBinding(binding);
result.removeUnfilteredQueue(binding.getQueue());
}
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Tue Mar 23 18:00:49 2010
@@ -1004,14 +1004,12 @@ public class QMFService implements Confi
public Long getUnackedMessages()
{
- // TODO
- return 0l;
+ return _obj.getUnackedMessageCount();
}
public Long getUnackedMessagesHigh()
{
- // TODO
- return 0l;
+ return _obj.getUnackedMessageCountHigh();
}
public Long getUnackedMessagesLow()
@@ -1404,8 +1402,7 @@ public class QMFService implements Confi
public Long getDelivered()
{
- // TODO
- return 0l;
+ return _obj.getDelivered();
}
public UUID getId()
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Tue Mar 23 18:00:49 2010
@@ -110,7 +110,7 @@ public class AMQBrokerManagerMBean exten
public String[] getExchangeTypes() throws IOException
{
ArrayList<String> exchangeTypes = new ArrayList<String>();
- for(ExchangeType<? extends Exchange> ex : _exchangeFactory.getRegisteredTypes())
+ for(ExchangeType<? extends Exchange> ex : _exchangeFactory.getPublicCreatableTypes())
{
exchangeTypes.add(ex.getName().toString());
}
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Mar 23 18:00:49 2010
@@ -200,16 +200,22 @@ public class AMQChannel implements Sessi
private void incrementOutstandingTxnsIfNecessary()
{
- //There can currently only be at most one outstanding transaction
- //due to only having LocalTransaction support. Set value to 1 if 0.
- _txnCount.compareAndSet(0,1);
+ if(isTransactional())
+ {
+ //There can currently only be at most one outstanding transaction
+ //due to only having LocalTransaction support. Set value to 1 if 0.
+ _txnCount.compareAndSet(0,1);
+ }
}
private void decrementOutstandingTxnsIfNecessary()
{
- //There can currently only be at most one outstanding transaction
- //due to only having LocalTransaction support. Set value to 0 if 1.
- _txnCount.compareAndSet(1,0);
+ if(isTransactional())
+ {
+ //There can currently only be at most one outstanding transaction
+ //due to only having LocalTransaction support. Set value to 0 if 1.
+ _txnCount.compareAndSet(1,0);
+ }
}
public Long getTxnStarts()
@@ -313,7 +319,7 @@ public class AMQChannel implements Sessi
}
else
{
- _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage,isTransactional()));
+ _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage));
}
}
@@ -1031,7 +1037,7 @@ public class AMQChannel implements Sessi
}
- private AMQMessage createAMQMessage(IncomingMessage incomingMessage, boolean transactional)
+ private AMQMessage createAMQMessage(IncomingMessage incomingMessage)
throws AMQException
{
@@ -1055,7 +1061,6 @@ public class AMQChannel implements Sessi
private class MessageDeliveryAction implements ServerTransaction.Action
{
- private boolean _transactional;
private IncomingMessage _incommingMessage;
private ArrayList<? extends BaseQueue> _destinationQueues;
@@ -1063,7 +1068,6 @@ public class AMQChannel implements Sessi
ArrayList<? extends BaseQueue> destinationQueues,
boolean transactional)
{
- _transactional = transactional;
_incommingMessage = currentMessage;
_destinationQueues = destinationQueues;
}
@@ -1074,7 +1078,7 @@ public class AMQChannel implements Sessi
{
final boolean immediate = _incommingMessage.isImmediate();
- final AMQMessage amqMessage = createAMQMessage(_incommingMessage, _transactional);
+ final AMQMessage amqMessage = createAMQMessage(_incommingMessage);
MessageReference ref = amqMessage.newReference();
for(final BaseQueue queue : _destinationQueues)
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java Tue Mar 23 18:00:49 2010
@@ -37,7 +37,7 @@ public class Binding
private final UUID _id;
private final AtomicLong _matches = new AtomicLong();
- Binding(UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
+ public Binding(UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
{
_id = id;
_bindingKey = bindingKey;
@@ -89,29 +89,30 @@ public class Binding
@Override
public boolean equals(final Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (o == null || !(o instanceof Binding))
+ {
+ return false;
+ }
final Binding binding = (Binding) o;
- if (!_bindingKey.equals(binding._bindingKey)) return false;
- if (!_exchange.equals(binding._exchange)) return false;
- if (!_queue.equals(binding._queue)) return false;
-
- return true;
+ return (_bindingKey == null ? binding.getBindingKey() == null : _bindingKey.equals(binding.getBindingKey()))
+ && (_exchange == null ? binding.getExchange() == null : _exchange.equals(binding.getExchange()))
+ && (_queue == null ? binding.getQueue() == null : _queue.equals(binding.getQueue()));
}
@Override
public int hashCode()
{
- int result = _bindingKey.hashCode();
- result = 31 * result + _queue.hashCode();
- result = 31 * result + _exchange.hashCode();
+ int result = _bindingKey == null ? 1 : _bindingKey.hashCode();
+ result = 31 * result + (_queue == null ? 3 : _queue.hashCode());
+ result = 31 * result + (_exchange == null ? 5 : _exchange.hashCode());
return result;
}
-
-
-
-
}
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java Tue Mar 23 18:00:49 2010
@@ -75,6 +75,10 @@ public interface QueueConfig extends Con
long getPersistentMsgEnqueues();
long getPersistentMsgDequeues();
+
+ long getUnackedMessageCount();
+
+ long getUnackedMessageCountHigh();
void purge(long request);
}
\ No newline at end of file
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SubscriptionConfig.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SubscriptionConfig.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SubscriptionConfig.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SubscriptionConfig.java Tue Mar 23 18:00:49 2010
@@ -43,5 +43,5 @@ public interface SubscriptionConfig exte
boolean isExplicitAcknowledge();
-
+ Long getDelivered();
}
\ No newline at end of file
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Tue Mar 23 18:00:49 2010
@@ -20,8 +20,12 @@
*/
package org.apache.qpid.server.exchange;
-import org.apache.log4j.Logger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.AMQShortString;
@@ -30,10 +34,6 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
public class DefaultExchangeFactory implements ExchangeFactory
{
private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
@@ -60,6 +60,21 @@ public class DefaultExchangeFactory impl
{
return _exchangeClassMap.values();
}
+
+ public Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes()
+ {
+ Collection<ExchangeType<? extends Exchange>> publicTypes =
+ new ArrayList<ExchangeType<? extends Exchange>>();
+ publicTypes.addAll(_exchangeClassMap.values());
+
+ //Remove the ManagementExchange type if present, as these
+ //are private and cannot be created by external means
+ publicTypes.remove(ManagementExchange.TYPE);
+
+ return publicTypes;
+ }
+
+
public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete)
throws AMQException
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java Tue Mar 23 18:00:49 2010
@@ -38,6 +38,8 @@ public interface ExchangeFactory
void initialise(VirtualHostConfiguration hostConfig);
Collection<ExchangeType<? extends Exchange>> getRegisteredTypes();
+
+ Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes();
Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException;
}
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Tue Mar 23 18:00:49 2010
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.qpid.framing.AMQTypedValue;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.message.AMQMessageHeader;
/**
@@ -38,69 +39,35 @@ class HeadersBinding
private static final Logger _logger = Logger.getLogger(HeadersBinding.class);
private final FieldTable _mappings;
+ private final Binding _binding;
private final Set<String> required = new HashSet<String>();
private final Map<String,Object> matches = new HashMap<String,Object>();
private boolean matchAny;
- private final class MatchesOrProcessor implements FieldTable.FieldTableElementProcessor
- {
- private Boolean _result = Boolean.FALSE;
-
- public boolean processElement(String propertyName, AMQTypedValue value)
- {
- if((value != null) && (value.getValue() != null) && value.getValue().equals(matches.get(propertyName)))
- {
- _result = Boolean.TRUE;
- return false;
- }
- return true;
- }
-
- public Object getResult()
- {
- return _result;
- }
- }
-
- private final class RequiredOrProcessor implements FieldTable.FieldTableElementProcessor
- {
- Boolean _result = Boolean.FALSE;
-
- public boolean processElement(String propertyName, AMQTypedValue value)
- {
- if(required.contains(propertyName))
- {
- _result = Boolean.TRUE;
- return false;
- }
- return true;
- }
-
- public Object getResult()
- {
- return _result;
- }
- }
-
-
-
/**
- * Creates a binding for a set of mappings. Those mappings whose value is
+ * Creates a header binding for a set of mappings. Those mappings whose value is
* null or the empty string are assumed only to be required headers, with
* no constraint on the value. Those with a non-null value are assumed to
* define a required match of value.
- * @param mappings the defined mappings this binding should use
+ *
+ * @param binding the binding to create a header binding using
*/
-
- HeadersBinding(FieldTable mappings)
+ public HeadersBinding(Binding binding)
{
- _mappings = mappings;
- initMappings();
+ _binding = binding;
+ if(_binding !=null)
+ {
+ _mappings = FieldTable.convertToFieldTable(_binding.getArguments());
+ initMappings();
+ }
+ else
+ {
+ _mappings = null;
+ }
}
-
+
private void initMappings()
{
-
_mappings.processOverElements(new FieldTable.FieldTableElementProcessor()
{
@@ -133,6 +100,11 @@ class HeadersBinding
{
return _mappings;
}
+
+ public Binding getBinding()
+ {
+ return _binding;
+ }
/**
* Checks whether the supplied headers match the requirements of this binding
@@ -250,4 +222,39 @@ class HeadersBinding
{
return key.startsWith("X-") || key.startsWith("x-");
}
-}
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (o == null)
+ {
+ return false;
+ }
+
+ if (!(o instanceof HeadersBinding))
+ {
+ return false;
+ }
+
+ final HeadersBinding hb = (HeadersBinding) o;
+
+ if(_binding == null)
+ {
+ if(hb.getBinding() != null)
+ {
+ return false;
+ }
+ }
+ else if (!_binding.equals(hb.getBinding()))
+ {
+ return false;
+ }
+
+ return true;
+ }
+}
\ No newline at end of file
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Tue Mar 23 18:00:49 2010
@@ -24,8 +24,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -36,10 +34,11 @@ import org.apache.qpid.server.binding.Bi
import javax.management.JMException;
import java.util.ArrayList;
-import java.util.List;
+import java.util.LinkedHashSet;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
/**
* An exchange that binds queues based on a set of required headers and header values
@@ -72,7 +71,14 @@ public class HeadersExchange extends Abs
{
private static final Logger _logger = Logger.getLogger(HeadersExchange.class);
+
+ private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey =
+ new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>();
+
+ private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers =
+ new CopyOnWriteArrayList<HeadersBinding>();
+
public static final ExchangeType<HeadersExchange> TYPE = new ExchangeType<HeadersExchange>()
{
@@ -102,34 +108,12 @@ public class HeadersExchange extends Abs
}
};
-
- private final List<Registration> _bindings = new CopyOnWriteArrayList<Registration>();
- private Map<AMQShortString, Registration> _bindingByKey = new ConcurrentHashMap<AMQShortString, Registration>();
-
-
public HeadersExchange()
{
super(TYPE);
}
+
- public void registerQueue(String routingKey, AMQQueue queue, Map<String,Object> args)
- {
- registerQueue(new AMQShortString(routingKey), queue, FieldTable.convertToFieldTable(args));
- }
-
- public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args)
- {
- _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() + " with " + args);
-
- Registration registration = new Registration(new HeadersBinding(args), queue, routingKey);
- _bindings.add(registration);
-
- }
-
- public void deregisterQueue(String routingKey, AMQQueue queue, Map<String,Object> args)
- {
- _bindings.remove(new Registration(args == null ? null : new HeadersBinding(FieldTable.convertToFieldTable(args)), queue, new AMQShortString(routingKey)));
- }
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
@@ -138,24 +122,27 @@ public class HeadersExchange extends Abs
{
_logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + header);
}
- boolean routed = false;
- ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>();
- for (Registration e : _bindings)
+
+ LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>();
+
+ for (HeadersBinding hb : _bindingHeaderMatchers)
{
-
- if (e.binding.matches(header))
+ if (hb.matches(header))
{
+ Binding b = hb.getBinding();
+
+ b.incrementMatches();
+
if (_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getNameShortString() + ": delivering message with headers " +
- header + " to " + e.queue.getNameShortString());
+ header + " to " + b.getQueue().getNameShortString());
}
- queues.add(e.queue);
-
- routed = true;
+ queues.add(b.getQueue());
}
}
- return queues;
+
+ return new ArrayList<BaseQueue>(queues);
}
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
@@ -166,38 +153,49 @@ public class HeadersExchange extends Abs
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
- return isBound(queue);
+ String bindingKey = (routingKey == null) ? "" : routingKey.toString();
+ CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
+
+ if(bindings != null)
+ {
+ for(Binding binding : bindings)
+ {
+ if(binding.getQueue().equals(queue))
+ {
+ return true;
+ }
+ }
+ }
+
+ return false;
}
public boolean isBound(AMQShortString routingKey)
{
- return hasBindings();
+ String bindingKey = (routingKey == null) ? "" : routingKey.toString();
+ CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
+ return bindings != null && !bindings.isEmpty();
}
public boolean isBound(AMQQueue queue)
{
- for (Registration r : _bindings)
+ for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values())
{
- if (r.queue.equals(queue))
+ for(Binding binding : bindings)
{
- return true;
+ if(binding.getQueue().equals(queue))
+ {
+ return true;
+ }
}
}
+
return false;
}
public boolean hasBindings()
{
- return !_bindings.isEmpty();
- }
-
-
-
- protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame)
- {
- //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
- //but these are not yet implemented.
- return ((BasicContentHeaderProperties) contentHeaderFrame.properties).getHeaders();
+ return !getBindings().isEmpty();
}
protected AbstractExchangeMBean createMBean() throws JMException
@@ -210,59 +208,51 @@ public class HeadersExchange extends Abs
return _logger;
}
-
- static class Registration
+ protected void onBind(final Binding binding)
{
- private final HeadersBinding binding;
- private final AMQQueue queue;
- private final AMQShortString routingKey;
+ String bindingKey = binding.getBindingKey();
+ AMQQueue queue = binding.getQueue();
+ AMQShortString routingKey = AMQShortString.valueOf(bindingKey);
+ Map<String,Object> args = binding.getArguments();
- Registration(HeadersBinding binding, AMQQueue queue, AMQShortString routingKey)
- {
- this.binding = binding;
- this.queue = queue;
- this.routingKey = routingKey;
- }
-
- public int hashCode()
- {
- int queueHash = queue.hashCode();
- int routingHash = routingKey == null ? 0 : routingKey.hashCode();
- return queueHash + routingHash;
- }
+ assert queue != null;
+ assert routingKey != null;
- public boolean equals(Object o)
- {
- return o instanceof Registration
- && ((Registration) o).queue.equals(queue)
- && (routingKey == null ? ((Registration)o).routingKey == null
- : routingKey.equals(((Registration)o).routingKey));
- }
+ CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
- public HeadersBinding getBinding()
+ if(bindings == null)
{
- return binding;
+ bindings = new CopyOnWriteArraySet<Binding>();
+ CopyOnWriteArraySet<Binding> newBindings;
+ if((newBindings = _bindingsByKey.putIfAbsent(bindingKey, bindings)) != null)
+ {
+ bindings = newBindings;
+ }
}
-
- public AMQQueue getQueue()
+
+ if(_logger.isDebugEnabled())
{
- return queue;
+ _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() +
+ " with binding key '" +bindingKey + "' and args: " + args);
}
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- }
+ _bindingHeaderMatchers.add(new HeadersBinding(binding));
+ bindings.add(binding);
- protected void onBind(final Binding binding)
- {
- registerQueue(binding.getBindingKey(), binding.getQueue(), binding.getArguments());
}
protected void onUnbind(final Binding binding)
{
- deregisterQueue(binding.getBindingKey(), binding.getQueue(), binding.getArguments());
+ assert binding != null;
+
+ CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(binding.getBindingKey());
+ if(bindings != null)
+ {
+ bindings.remove(binding);
+ }
+
+ _logger.debug("===============");
+ _logger.debug("Removing Binding: " + _bindingHeaderMatchers.remove(new HeadersBinding(binding)));
}
}
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Tue Mar 23 18:00:49 2010
@@ -31,6 +31,7 @@ import org.apache.qpid.framing.FieldTabl
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.topic.*;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.message.InboundMessage;
@@ -83,7 +84,7 @@ public class TopicExchange extends Abstr
private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
- private final Map<TopicBinding, FieldTable> _bindings = new HashMap<TopicBinding, FieldTable>();
+ private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>();
@@ -92,20 +93,12 @@ public class TopicExchange extends Abstr
super(TYPE);
}
- public synchronized void registerQueue(String rKey, AMQQueue queue, Map<String,Object> args)
- {
- try
- {
- registerQueue(new AMQShortString(rKey), queue, FieldTable.convertToFieldTable(args));
- }
- catch (AMQInvalidArgumentException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQInvalidArgumentException
+ protected synchronized void registerQueue(final Binding binding) throws AMQInvalidArgumentException
{
+ AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ;
+ AMQQueue queue = binding.getQueue();
+ FieldTable args = FieldTable.convertToFieldTable(binding.getArguments());
+
assert queue != null;
assert rKey != null;
@@ -114,8 +107,6 @@ public class TopicExchange extends Abstr
AMQShortString routingKey = TopicNormalizer.normalize(rKey);
- TopicBinding binding = new TopicBinding(rKey, queue, args);
-
if(_bindings.containsKey(binding))
{
FieldTable oldArgs = _bindings.get(binding);
@@ -146,6 +137,8 @@ public class TopicExchange extends Abstr
return;
}
}
+
+ result.addBinding(binding);
}
else
@@ -177,6 +170,8 @@ public class TopicExchange extends Abstr
result.addUnfilteredQueue(queue);
}
}
+
+ result.addBinding(binding);
_bindings.put(binding, args);
}
@@ -226,7 +221,8 @@ public class TopicExchange extends Abstr
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
{
- TopicBinding binding = new TopicBinding(routingKey, queue, arguments);
+ Binding binding = new Binding(null, routingKey.toString(), queue, this, FieldTable.convertToMap(arguments));
+
if (arguments == null)
{
return _bindings.containsKey(binding);
@@ -253,7 +249,7 @@ public class TopicExchange extends Abstr
public boolean isBound(AMQShortString routingKey)
{
- for(TopicBinding b : _bindings.keySet())
+ for(Binding b : _bindings.keySet())
{
if(b.getBindingKey().equals(routingKey))
{
@@ -266,7 +262,7 @@ public class TopicExchange extends Abstr
public boolean isBound(AMQQueue queue)
{
- for(TopicBinding b : _bindings.keySet())
+ for(Binding b : _bindings.keySet())
{
if(b.getQueue().equals(queue))
{
@@ -282,19 +278,16 @@ public class TopicExchange extends Abstr
return !_bindings.isEmpty();
}
-
- public void deregisterQueue(String rKey, AMQQueue queue, Map<String, Object> args)
- {
- removeBinding(new TopicBinding(new AMQShortString(rKey), queue, FieldTable.convertToFieldTable(args)));
- }
-
- private boolean removeBinding(final TopicBinding binding)
+ private boolean deregisterQueue(final Binding binding)
{
if(_bindings.containsKey(binding))
{
FieldTable bindingArgs = _bindings.remove(binding);
- AMQShortString bindingKey = TopicNormalizer.normalize(binding.getBindingKey());
+ AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
+
+ result.removeBinding(binding);
+
if(argumentsContainSelector(bindingArgs))
{
try
@@ -341,8 +334,14 @@ public class TopicExchange extends Abstr
Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>();
for(TopicMatcherResult result : results)
{
+ TopicExchangeResult res = (TopicExchangeResult)result;
- queues = ((TopicExchangeResult)result).processMessage(message, queues);
+ for(Binding b : res.getBindings())
+ {
+ b.incrementMatches();
+ }
+
+ queues = res.processMessage(message, queues);
}
return queues;
}
@@ -350,14 +349,21 @@ public class TopicExchange extends Abstr
}
- protected void onBind(final org.apache.qpid.server.binding.Binding binding)
+ protected void onBind(final Binding binding)
{
- registerQueue(binding.getBindingKey(),binding.getQueue(),binding.getArguments());
+ try
+ {
+ registerQueue(binding);
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- protected void onUnbind(final org.apache.qpid.server.binding.Binding binding)
+ protected void onUnbind(final Binding binding)
{
- deregisterQueue(binding.getBindingKey(),binding.getQueue(),binding.getArguments());
+ deregisterQueue(binding);
}
}
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java Tue Mar 23 18:00:49 2010
@@ -21,14 +21,22 @@
package org.apache.qpid.server.exchange.topic;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
public final class TopicExchangeResult implements TopicMatcherResult
{
+ private final List<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
@@ -64,6 +72,20 @@ public final class TopicExchangeResult i
return _unfilteredQueues.keySet();
}
+ public void addBinding(Binding binding)
+ {
+ _bindings.add(binding);
+ }
+
+ public void removeBinding(Binding binding)
+ {
+ _bindings.remove(binding);
+ }
+
+ public List<Binding> getBindings()
+ {
+ return new ArrayList<Binding>(_bindings);
+ }
public void addFilteredQueue(AMQQueue queue, MessageFilter filter)
{
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -3,4 +3,4 @@
/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/management:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/management:787599
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,919043-926606
Propchange: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 23 18:00:49 2010
@@ -3,4 +3,4 @@
/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,919043-926606
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Mar 23 18:00:49 2010
@@ -111,6 +111,7 @@ public interface AMQQueue extends Managa
void dequeue(QueueEntry entry, Subscription sub);
+ void decrementUnackedMsgCount();
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
Modified: qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=926686&r1=926685&r2=926686&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Mar 23 18:00:49 2010
@@ -226,22 +226,29 @@ public class QueueEntryImpl implements Q
public void release()
{
- _stateUpdater.set(this,AVAILABLE_STATE);
- if(!getQueue().isDeleted())
+ EntryState state = _state;
+
+ if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
- getQueue().requeue(this);
- if(_stateChangeListeners != null)
+ if(state instanceof SubscriptionAcquiredState)
{
- notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ getQueue().decrementUnackedMsgCount();
}
+
+ if(!getQueue().isDeleted())
+ {
+ getQueue().requeue(this);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ }
+ }
+ else if(acquire())
+ {
+ routeToAlternate();
+ }
}
- else if(acquire())
- {
- routeToAlternate();
- }
-
-
}
public boolean releaseButRetain()
@@ -369,6 +376,7 @@ public class QueueEntryImpl implements Q
Subscription s = null;
if (state instanceof SubscriptionAcquiredState)
{
+ getQueue().decrementUnackedMsgCount();
s = ((SubscriptionAcquiredState) state).getSubscription();
s.onDequeue(this);
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org