You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC
svn commit: r1525101 [9/21] - in /qpid/branches/linearstore/qpid: ./ bin/
cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/
cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/...
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/interlink_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/interlink_tests.py?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/interlink_tests.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/interlink_tests.py Fri Sep 20 18:59:30 2013
@@ -22,6 +22,7 @@ import os, signal, sys, time, imp, re, s
import traceback
from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
from brokertest import *
+from ha_test import HaPort
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG, INFO
from qpidtoollibs import BrokerAgent, BrokerObject
@@ -46,7 +47,8 @@ class AmqpBrokerTest(BrokerTest):
def setUp(self):
BrokerTest.setUp(self)
os.putenv("QPID_LOAD_MODULE", BrokerTest.amqpc_lib)
- self.broker = self.amqp_broker()
+ self.port_holder = HaPort(self)
+ self.broker = self.amqp_broker(port_holder=self.port_holder)
self.default_config = Config(self.broker)
self.agent = BrokerAgent(self.broker.connect())
@@ -126,6 +128,9 @@ class AmqpBrokerTest(BrokerTest):
def test_translate2(self):
self.send_and_receive(send_config=Config(self.broker, version="amqp0-10"))
+ def test_translate_with_large_routingkey(self):
+ self.send_and_receive(send_config=Config(self.broker, address="amq.topic/a.%s" % ("x" * 256), version="amqp1.0"), recv_config=Config(self.broker, address="amq.topic/a.*", version="amqp0-10"), wait_for_receiver=True)
+
def send_and_receive_empty(self, send_config=None, recv_config=None):
sconfig = send_config or self.default_config
rconfig = recv_config or self.default_config
@@ -218,16 +223,22 @@ class AmqpBrokerTest(BrokerTest):
assert len(domains) == 1
assert domains[0].name == "BrokerB"
- def test_incoming_link(self):
+ def incoming_link(self, mechanism):
brokerB = self.amqp_broker()
agentB = BrokerAgent(brokerB.connect())
self.agent.create("queue", "q")
agentB.create("queue", "q")
- self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"})
+ self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":mechanism})
self.agent.create("incoming", "Link1", {"domain":"BrokerB","source":"q","target":"q"})
#send to brokerB, receive from brokerA
self.send_and_receive(send_config=Config(brokerB))
+ def test_incoming_link_anonymous(self):
+ self.incoming_link("ANONYMOUS")
+
+ def test_incoming_link_nosasl(self):
+ self.incoming_link("NONE")
+
def test_outgoing_link(self):
brokerB = self.amqp_broker()
agentB = BrokerAgent(brokerB.connect())
@@ -246,14 +257,73 @@ class AmqpBrokerTest(BrokerTest):
#send to q on broker B through brokerA
self.send_and_receive(send_config=Config(self.broker, address="q@BrokerB"), recv_config=Config(brokerB))
+ def test_reconnect(self):
+ receiver_cmd = ["qpid-receive",
+ "--broker", self.broker.host_port(),
+ "--address=amq.fanout",
+ "--connection-options={protocol:amqp1.0, reconnect:True,container_id:receiver}",
+ "--timeout=10", "--print-content=true", "--print-headers=false"
+ ]
+ receiver = self.popen(receiver_cmd, stdout=PIPE)
+
+ sender_cmd = ["qpid-send",
+ "--broker", self.broker.host_port(),
+ "--address=amq.fanout",
+ "--connection-options={protocol:amqp1.0,reconnect:True,container_id:sender}",
+ "--content-stdin", "--send-eos=1"
+ ]
+ sender = self.popen(sender_cmd, stdin=PIPE)
+ sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112
+
+
+ batch1 = ["message-%s" % (i+1) for i in range(10000)]
+ for m in batch1:
+ sender.stdin.write(m + "\n")
+ sender.stdin.flush()
+
+ self.broker.kill()
+ self.broker = self.amqp_broker(port_holder=self.port_holder)
+
+ batch2 = ["message-%s" % (i+1) for i in range(10000, 20000)]
+ for m in batch2:
+ sender.stdin.write(m + "\n")
+ sender.stdin.flush()
+
+ sender.stdin.close()
+
+ last = None
+ m = receiver.stdout.readline().rstrip()
+ while len(m):
+ last = m
+ m = receiver.stdout.readline().rstrip()
+ assert last == "message-20000", (last)
+
""" Create and return a broker with AMQP 1.0 support """
def amqp_broker(self):
assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
+ self.port_holder = HaPort(self) #reserve port
args = ["--load-module", BrokerTest.amqp_lib,
- "--max-negotiate-time=600000",
+ "--socket-fd=%s" % self.port_holder.fileno,
+ "--listen-disable=tcp",
"--log-enable=trace+:Protocol",
"--log-enable=info+"]
- return BrokerTest.broker(self, args)
+ return BrokerTest.broker(self, args, port=self.port_holder.port)
+
+ def amqp_broker(self, port_holder=None):
+ assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
+ if port_holder:
+ args = ["--load-module", BrokerTest.amqp_lib,
+ "--socket-fd=%s" % port_holder.fileno,
+ "--listen-disable=tcp",
+ "--log-enable=trace+:Protocol",
+ "--log-enable=info+"]
+ return BrokerTest.broker(self, args, port=port_holder.port)
+ else:
+ args = ["--load-module", BrokerTest.amqp_lib,
+ "--log-enable=trace+:Protocol",
+ "--log-enable=info+"]
+ return BrokerTest.broker(self, args)
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/qpid-receive.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/qpid-receive.cpp Fri Sep 20 18:59:30 2013
@@ -69,11 +69,12 @@ struct Options : public qpid::Options
string readyAddress;
uint receiveRate;
std::string replyto;
+ bool noReplies;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
help(false),
- url("amqp:tcp:127.0.0.1"),
+ url("127.0.0.1"),
timeout(0),
forever(false),
messages(0),
@@ -91,7 +92,8 @@ struct Options : public qpid::Options
reportTotal(false),
reportEvery(0),
reportHeader(true),
- receiveRate(0)
+ receiveRate(0),
+ noReplies(false)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -116,6 +118,7 @@ struct Options : public qpid::Options
("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive")
("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.")
("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address on response messages")
+ ("ignore-reply-to", qpid::optValue(noReplies), "Do not send replies even if reply-to is set")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -222,6 +225,7 @@ int main(int argc, char ** argv)
if (opts.printHeaders) {
if (msg.getSubject().size()) std::cout << "Subject: " << msg.getSubject() << std::endl;
if (msg.getReplyTo()) std::cout << "ReplyTo: " << msg.getReplyTo() << std::endl;
+ if (msg.getMessageId().size()) std::cout << "MessageId: " << msg.getMessageId() << std::endl;
if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl;
if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl;
if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl;
@@ -231,8 +235,10 @@ int main(int argc, char ** argv)
std::cout << "Properties: " << msg.getProperties() << std::endl;
std::cout << std::endl;
}
- if (opts.printContent)
- std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
+ if (opts.printContent) {
+ if (!msg.getContentObject().isVoid()) std::cout << msg.getContentObject() << std::endl;
+ else std::cout << msg.getContent() << std::endl;
+ }
if (opts.messages && count >= opts.messages) done = true;
}
}
@@ -245,7 +251,7 @@ int main(int argc, char ** argv)
} else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
session.acknowledge();
}
- if (msg.getReplyTo()) { // Echo message back to reply-to address.
+ if (msg.getReplyTo() && !opts.noReplies) { // Echo message back to reply-to address.
Sender& s = replyTo[msg.getReplyTo().str()];
if (s.isNull()) {
s = session.createSender(msg.getReplyTo());
@@ -260,8 +266,6 @@ int main(int argc, char ** argv)
int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
}
- // Clear out message properties & content for next iteration.
- msg = Message(); // TODO aconway 2010-12-01: should be done by fetch
}
if (opts.reportTotal) reporter.report();
if (opts.tx) {
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/qpid-send.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/qpid-send.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/qpid-send.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/qpid-send.cpp Fri Sep 20 18:59:30 2013
@@ -96,7 +96,7 @@ struct Options : public qpid::Options
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
help(false),
- url("amqp:tcp:127.0.0.1"),
+ url("127.0.0.1"),
messages(1),
sendEos(0),
durable(false),
@@ -262,9 +262,8 @@ class MapContentGenerator : public Con
public:
MapContentGenerator(const Options& opt) : opts(opt) {}
virtual bool setContent(Message& msg) {
- Variant::Map map;
- opts.setEntries(map);
- encode(map, msg);
+ msg.getContentObject() = qpid::types::Variant::Map();
+ opts.setEntries(msg.getContentObject().asMap());
return true;
}
private:
@@ -371,6 +370,7 @@ int main(int argc, char ** argv)
msg.setReplyTo(Address(opts.replyto));
}
if (!opts.userid.empty()) msg.setUserId(opts.userid);
+ if (!opts.id.empty()) msg.setMessageId(opts.id);
if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid);
opts.setProperties(msg);
uint sent = 0;
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/qpidd_qmfv2_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/qpidd_qmfv2_tests.py?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/qpidd_qmfv2_tests.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/qpidd_qmfv2_tests.py Fri Sep 20 18:59:30 2013
@@ -39,20 +39,25 @@ class ConsoleTest(BrokerTest):
def setUp(self):
BrokerTest.setUp(self)
- args = ["--mgmt-qmf1=no",
- "--mgmt-pub-interval=%d" % self.PUB_INTERVAL]
+
+ def _startBroker(self, QMFv1=False ):
+ self._broker_is_v1 = QMFv1
+ if self._broker_is_v1:
+ args = ["--mgmt-qmf1=yes", "--mgmt-qmf2=no"]
+ else:
+ args = ["--mgmt-qmf1=no", "--mgmt-qmf2=yes"]
+
+ args.append("--mgmt-pub-interval=%d" % self.PUB_INTERVAL)
self.broker = BrokerTest.broker(self, args)
- def _startQmfV2(self, broker, console=None):
+
+ def _myStartQmf(self, broker, console=None):
# I manually set up the QMF session here rather than call the startQmf
# method from BrokerTest as I can guarantee the console library is used
# (assuming BrokerTest's implementation of startQmf could change)
self.qmf_session = qmf.console.Session(console)
self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (broker.host(),
broker.port()))
- self.assertEqual(self.qmf_broker.getBrokerAgent().isV2, True,
- "Expected broker agent to support QMF V2")
-
def _create_queue( self, q_name, args={} ):
broker = self.qmf_session.getObjects(_class="broker")[0]
@@ -60,11 +65,11 @@ class ConsoleTest(BrokerTest):
self.assertEqual(result.status, 0, result)
- def test_method_call(self):
+ def _test_method_call(self):
""" Verify method calls work, and check the behavior of getObjects()
call
"""
- self._startQmfV2( self.broker )
+ self._myStartQmf( self.broker )
self._create_queue( "fleabag", {"auto-delete":True} )
qObj = None
@@ -76,12 +81,16 @@ class ConsoleTest(BrokerTest):
self.assertNotEqual(qObj, None, "Failed to get queue object")
#print qObj
- def test_unsolicited_updates(self):
+ def _test_unsolicited_updates(self):
""" Verify that the Console callbacks work
"""
class Handler(qmf.console.Console):
def __init__(self):
+ self.v1_oids = 0
+ self.v1_events = 0
+ self.v2_oids = 0
+ self.v2_events = 0
self.broker_info = []
self.broker_conn = []
self.newpackage = []
@@ -109,28 +118,38 @@ class ConsoleTest(BrokerTest):
def event(self, broker, event):
#print "EVENT %s" % event
self.events.append(event)
- def objectProps(self, broker, record):
- #print "ObjProps %s" % record
- assert len(record.getProperties()), "objectProps() invoked with no properties?"
+ if event.isV2:
+ self.v2_events += 1
+ else:
+ self.v1_events += 1
+
+ def heartbeat(self, agent, timestamp):
+ #print "Heartbeat %s" % agent
+ self.heartbeats.append( (agent, timestamp) )
+
+ # generic handler for objectProps and objectStats
+ def _handle_obj_update(self, record):
oid = record.getObjectId()
+ if oid.isV2:
+ self.v2_oids += 1
+ else:
+ self.v1_oids += 1
+
if oid not in self.updates:
self.updates[oid] = record
else:
self.updates[oid].mergeUpdate( record )
+
+ def objectProps(self, broker, record):
+ assert len(record.getProperties()), "objectProps() invoked with no properties?"
+ self._handle_obj_update(record)
+
def objectStats(self, broker, record):
- #print "ObjStats %s" % record
assert len(record.getStatistics()), "objectStats() invoked with no properties?"
- oid = record.getObjectId()
- if oid not in self.updates:
- self.updates[oid] = record
- else:
- self.updates[oid].mergeUpdate( record )
- def heartbeat(self, agent, timestamp):
- #print "Heartbeat %s" % agent
- self.heartbeats.append( (agent, timestamp) )
+ self._handle_obj_update(record)
handler = Handler()
- self._startQmfV2( self.broker, handler )
+ self._myStartQmf( self.broker, handler )
# this should force objectProps, queueDeclare Event callbacks
self._create_queue( "fleabag", {"auto-delete":True} )
# this should force objectStats callback
@@ -163,7 +182,15 @@ class ConsoleTest(BrokerTest):
break
assert msgs == 3, "msgDepth statistics not accurate!"
- def test_async_method(self):
+ # verify that the published objects were of the correct QMF version
+ if self._broker_is_v1:
+ assert handler.v1_oids and handler.v2_oids == 0, "QMFv2 updates received while in V1-only mode!"
+ assert handler.v1_events and handler.v2_events == 0, "QMFv2 events received while in V1-only mode!"
+ else:
+ assert handler.v2_oids and handler.v1_oids == 0, "QMFv1 updates received while in V2-only mode!"
+ assert handler.v2_events and handler.v1_events == 0, "QMFv1 events received while in V2-only mode!"
+
+ def _test_async_method(self):
class Handler (qmf.console.Console):
def __init__(self):
self.cv = Condition()
@@ -207,12 +234,40 @@ class ConsoleTest(BrokerTest):
return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious)
handler = Handler()
- self._startQmfV2(self.broker, handler)
+ self._myStartQmf(self.broker, handler)
broker = self.qmf_session.getObjects(_class="broker")[0]
handler.request(broker, 20)
sleep(1)
self.assertEqual(handler.check(), "pass")
+ def test_method_call(self):
+ self._startBroker()
+ self._test_method_call()
+
+ def test_unsolicited_updates(self):
+ self._startBroker()
+ self._test_unsolicited_updates()
+
+ def test_async_method(self):
+ self._startBroker()
+ self._test_async_method()
+
+ # For now, include "QMFv1 only" tests. Once QMFv1 is deprecated, these can
+ # be removed
+
+ def test_method_call_v1(self):
+ self._startBroker(QMFv1=True)
+ self._test_method_call()
+
+ def test_unsolicited_updates_v1(self):
+ self._startBroker(QMFv1=True)
+ self._test_unsolicited_updates()
+
+ def test_async_method_v1(self):
+ self._startBroker(QMFv1=True)
+ self._test_async_method()
+
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/qpidt
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/qpidt?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/qpidt (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/qpidt Fri Sep 20 18:59:30 2013
@@ -117,7 +117,7 @@ class Manager:
if k == "name":
name = v
elif v:
- if isinstance(v, dict) and v["_object_name"]:
+ if isinstance(v, dict) and "_object_name" in v:
v = v["_object_name"]
details += "%s=%s " %(k,v)
print "%-25s %s" % (name, details)
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/queue_flow_limit_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/queue_flow_limit_tests.py?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/queue_flow_limit_tests.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/queue_flow_limit_tests.py Fri Sep 20 18:59:30 2013
@@ -27,6 +27,8 @@ from os import environ, popen
class QueueFlowLimitTests(TestBase010):
+ _timeout = 100
+
def __getattr__(self, name):
if name == "assertGreater":
return lambda a, b: self.failUnless(a > b)
@@ -156,7 +158,7 @@ class QueueFlowLimitTests(TestBase010):
totalMsgs = 1213 + 797 + 331
# wait until flow control is active
- deadline = time() + 10
+ deadline = time() + self._timeout
while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
time() < deadline:
pass
@@ -209,7 +211,7 @@ class QueueFlowLimitTests(TestBase010):
totalMsgs = 1699 + 1129 + 881
# wait until flow control is active
- deadline = time() + 10
+ deadline = time() + self._timeout
while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
time() < deadline:
pass
@@ -255,7 +257,7 @@ class QueueFlowLimitTests(TestBase010):
# fill up the queue, waiting until flow control is active
sndr1 = self._start_qpid_send(testq.mgmt.name, count=testq.sendCount, content=testq.content)
- deadline = time() + 10
+ deadline = time() + self._timeout
while (not testq.mgmt.flowStopped) and time() < deadline:
testq.mgmt.update()
@@ -357,7 +359,7 @@ class QueueFlowLimitTests(TestBase010):
sender = BlockedSender(self, "kill-q", count=100)
# wait for flow control
- deadline = time() + 10
+ deadline = time() + self._timeout
while (not q.flowStopped) and time() < deadline:
q.update()
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/rsynchosts
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/rsynchosts?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/rsynchosts (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/rsynchosts Fri Sep 20 18:59:30 2013
@@ -27,13 +27,21 @@ abspath() {
}
usage() {
- echo "Usage: $(basename $0) file [file...]
+ echo "Usage: $(basename $0) [-l user] file [file...]
Synchronize the contents of each file or directory to the same absolute path on
each host in \$HOSTS.
"
exit 1
}
+while getopts "l:" opt; do
+ case $opt in
+ l) RSYNC_USER="$OPTARG@" ;;
+ *) usage ;;
+ esac
+done
+shift `expr $OPTIND - 1`
+
test "$*" || usage
for f in $*; do FILES="$FILES $(abspath $f)" || exit 1; done
@@ -42,7 +50,7 @@ OK_FILE=`mktemp` # Will be deleted if a
trap "rm -f $OK_FILE" EXIT
for h in $HOSTS; do
- rsync -aRO --delete $FILES $h:/ || { echo "rsync to $h failed"; rm -f $OK_FILE; } &
+ rsync -vaRO --delete $FILES $RSYNC_USER$h:/ || { echo "rsync to $h failed"; rm -f $OK_FILE; } &
done
wait
test -f $OK_FILE
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/run_cli_tests
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/run_cli_tests?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/run_cli_tests (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/run_cli_tests Fri Sep 20 18:59:30 2013
@@ -44,8 +44,8 @@ start_brokers() {
# look like they're xml related.
# if we start supporting xml on windows, it will need something similar
# here
- if [ -f ../.libs/xml.so ] ; then
- xargs="--load-module ../.libs/xml.so"
+ if [ -f ../xml.so ] ; then
+ xargs="--load-module ../xml.so"
if [ ! -f test.xquery ] ; then
create_test_xquery
fi
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/run_federation_tests
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/run_federation_tests?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/run_federation_tests (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/run_federation_tests Fri Sep 20 18:59:30 2013
@@ -25,7 +25,7 @@ source ./test_env.sh
#set -x
trap stop_brokers INT TERM QUIT
-if [ -f ../.libs/xml.so ] ; then
+if [ -f ../xml.so ] ; then
MODULES="--load-module xml" # Load the XML exchange and run XML exchange federation tests
SKIPTESTS=
else
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/test_env.ps1.in
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/test_env.ps1.in?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/test_env.ps1.in (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/test_env.ps1.in Fri Sep 20 18:59:30 2013
@@ -41,7 +41,6 @@ $PYTHON_COMMANDS="$QPID_TOOLS\src\py"
$env:PYTHONPATH="$srcdir;$PYTHON_DIR;$PYTHON_COMMANDS;$QPID_TESTS_PY;$QPID_TOOLS_LIBS;$QMF_LIB;$env:PYTHONPATH"
$QPID_CONFIG_EXEC="$PYTHON_COMMANDS\qpid-config"
$QPID_ROUTE_EXEC="$PYTHON_COMMANDS\qpid-route"
-$QPID_CLUSTER_EXEC="$PYTHON_COMMANDS\qpid-cluster"
$QPID_HA_TOOL_EXEC="$PYTHON_COMMANDS\qpid-ha-tool"
# Executables
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/test_env.sh.in
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/test_env.sh.in?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/test_env.sh.in (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/test_env.sh.in Fri Sep 20 18:59:30 2013
@@ -25,6 +25,8 @@ builddir=`absdir @abs_builddir@`
top_srcdir=`absdir @abs_top_srcdir@`
top_builddir=`absdir @abs_top_builddir@`
moduledir=$top_builddir/src@builddir_lib_suffix@
+pythonswigdir=$top_builddir/bindings/qpid/python/
+pythonswiglibdir=$top_builddir/bindings/qpid/python@builddir_lib_suffix@
testmoduledir=$builddir@builddir_lib_suffix@
export QPID_INSTALL_PREFIX=@prefix@
@@ -44,7 +46,8 @@ export PYTHONPATH=$srcdir:$PYTHON_DIR:$P
export QPID_CONFIG_EXEC=$PYTHON_COMMANDS/qpid-config
export QPID_ROUTE_EXEC=$PYTHON_COMMANDS/qpid-route
export QPID_HA_EXEC=$PYTHON_COMMANDS/qpid-ha
-
+export PYTHONPATH_SWIG=$pythonswigdir:$pythonswiglibdir
+export PYTHONSWIGMODULE=$pythonswigdir/qpid_messaging.py
# Executables
export QPIDD_EXEC=$top_builddir/src/qpidd
@@ -78,5 +81,5 @@ if [ ! -e "$HOME" ]; then
fi
# Options for boost test framework
-export BOOST_TEST_SHOW_PROGRESS=yes
-export BOOST_TEST_CATCH_SYSTEM_ERRORS=no
+test -z "$BOOST_TEST_SHOW_PROGRESS" && export BOOST_TEST_SHOW_PROGRESS=yes
+test -z "$BOOST_TEST_CATCH_SYSTEM_ERRORS" && export BOOST_TEST_CATCH_SYSTEM_ERRORS=no
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/test_store.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/test_store.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/test_store.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/test_store.cpp Fri Sep 20 18:59:30 2013
@@ -40,14 +40,19 @@
#include "qpid/sys/Thread.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
+#include "qpid/RefCounted.h"
+#include "qpid/Msg.h"
#include <boost/cast.hpp>
#include <boost/lexical_cast.hpp>
#include <memory>
+#include <ostream>
#include <fstream>
+#include <sstream>
-using namespace qpid;
-using namespace broker;
using namespace std;
+using namespace boost;
+using namespace qpid;
+using namespace qpid::broker;
using namespace qpid::sys;
namespace qpid {
@@ -57,11 +62,19 @@ struct TestStoreOptions : public Options
string name;
string dump;
+ string events;
+ vector<string> throwMsg; // Throw exception if message content matches.
TestStoreOptions() : Options("Test Store Options") {
addOptions()
- ("test-store-name", optValue(name, "NAME"), "Name of test store instance.")
- ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.")
+ ("test-store-name", optValue(name, "NAME"),
+ "Name of test store instance.")
+ ("test-store-dump", optValue(dump, "FILE"),
+ "File to dump enqueued messages.")
+ ("test-store-events", optValue(events, "FILE"),
+ "File to log events, 1 line per event.")
+ ("test-store-throw", optValue(throwMsg, "CONTENT"),
+ "Throw exception if message content matches.")
;
}
};
@@ -82,24 +95,76 @@ class TestStore : public NullMessageStor
TestStore(const TestStoreOptions& opts, Broker& broker_)
: options(opts), name(opts.name), broker(broker_)
{
- QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump);
- if (!options.dump.empty())
+ QPID_LOG(info, "TestStore name=" << name
+ << " dump=" << options.dump
+ << " events=" << options.events
+ << " throw messages =" << options.throwMsg.size());
+
+ if (!options.dump.empty())
dump.reset(new ofstream(options.dump.c_str()));
+ if (!options.events.empty())
+ events.reset(new ofstream(options.events.c_str()));
}
~TestStore() {
for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1));
}
- virtual bool isNull() const { return false; }
-
- void enqueue(TransactionContext* ,
+ // Dummy transaction context.
+ struct TxContext : public TPCTransactionContext {
+ static int nextId;
+ string id;
+ TxContext() : id(lexical_cast<string>(nextId++)) {}
+ TxContext(string xid) : id(xid) {}
+ };
+
+ static string getId(const TransactionContext& tx) {
+ const TxContext* tc = dynamic_cast<const TxContext*>(&tx);
+ assert(tc);
+ return tc->id;
+ }
+
+
+ bool isNull() const { return false; }
+
+ void log(const string& msg) {
+ QPID_LOG(info, "test_store: " << msg);
+ if (events.get()) *events << msg << endl << std::flush;
+ }
+
+ auto_ptr<TransactionContext> begin() {
+ auto_ptr<TxContext> tx(new TxContext());
+ log(Msg() << "<begin tx " << tx->id << ">");
+ return auto_ptr<TransactionContext>(tx);
+ }
+
+ auto_ptr<TPCTransactionContext> begin(const std::string& xid) {
+ auto_ptr<TxContext> tx(new TxContext(xid));
+ log(Msg() << "<begin tx " << tx->id << ">");
+ return auto_ptr<TPCTransactionContext>(tx);
+ }
+
+ string getContent(const intrusive_ptr<PersistableMessage>& msg) {
+ intrusive_ptr<broker::Message::Encoding> enc(
+ dynamic_pointer_cast<broker::Message::Encoding>(msg));
+ return enc->getContent();
+ }
+
+ void enqueue(TransactionContext* tx,
const boost::intrusive_ptr<PersistableMessage>& pmsg,
- const PersistableQueue& )
+ const PersistableQueue& queue)
{
- qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
+ QPID_LOG(debug, "TestStore enqueue " << queue.getName());
+ qpid::broker::amqp_0_10::MessageTransfer* msg =
+ dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
assert(msg);
+ ostringstream o;
+ o << "<enqueue " << queue.getName() << " " << getContent(msg);
+ if (tx) o << " tx=" << getId(*tx);
+ o << ">";
+ log(o.str());
+
// Dump the message if there is a dump file.
if (dump.get()) {
msg->getFrames().getMethod()->print(*dump);
@@ -113,7 +178,11 @@ class TestStore : public NullMessageStor
string data = msg->getFrames().getContent();
size_t i = string::npos;
size_t j = string::npos;
- if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0
+ const vector<string>& throwMsg(options.throwMsg);
+ if (find(throwMsg.begin(), throwMsg.end(), data) != throwMsg.end()) {
+ throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data));
+ }
+ else if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0
&& (i = data.find(name+"[")) != string::npos
&& (j = data.find("]", i)) != string::npos)
{
@@ -144,6 +213,31 @@ class TestStore : public NullMessageStor
msg->enqueueComplete();
}
+ void dequeue(TransactionContext* tx,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
+ {
+ QPID_LOG(debug, "TestStore dequeue " << queue.getName());
+ ostringstream o;
+ o<< "<dequeue " << queue.getName() << " " << getContent(msg);
+ if (tx) o << " tx=" << getId(*tx);
+ o << ">";
+ log(o.str());
+ }
+
+ void prepare(TPCTransactionContext& txn) {
+ log(Msg() << "<prepare tx=" << getId(txn) << ">");
+ }
+
+ void commit(TransactionContext& txn) {
+ log(Msg() << "<commit tx=" << getId(txn) << ">");
+ }
+
+ void abort(TransactionContext& txn) {
+ log(Msg() << "<abort tx=" << getId(txn) << ">");
+ }
+
+
private:
static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
TestStoreOptions options;
@@ -151,8 +245,11 @@ class TestStore : public NullMessageStor
Broker& broker;
vector<Thread> threads;
std::auto_ptr<ofstream> dump;
+ std::auto_ptr<ofstream> events;
};
+int TestStore::TxContext::nextId(1);
+
const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
const string TestStore::EXCEPTION = "exception";
const string TestStore::EXIT_PROCESS = "exit_process";
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/test_tools.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/test_tools.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/test_tools.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/test_tools.h Fri Sep 20 18:59:30 2013
@@ -23,7 +23,6 @@
#include <limits.h> // Include before boost/test headers.
#include <boost/test/test_tools.hpp>
#include <boost/assign/list_of.hpp>
-#include <boost/assign/list_of.hpp>
#include <vector>
#include <set>
#include <ostream>
Modified: qpid/branches/linearstore/qpid/doc/book/Makefile
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/doc/book/Makefile?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/doc/book/Makefile (original)
+++ qpid/branches/linearstore/qpid/doc/book/Makefile Fri Sep 20 18:59:30 2013
@@ -17,7 +17,7 @@
# under the License.
#
-DIRS = src/java-broker src/cpp-broker src/programming
+DIRS = src/java-broker src/java-perftests src/cpp-broker src/programming
.PHONY: all $(DIRS)
Modified: qpid/branches/linearstore/qpid/doc/book/src/Makefile.inc
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/doc/book/src/Makefile.inc?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/doc/book/src/Makefile.inc (original)
+++ qpid/branches/linearstore/qpid/doc/book/src/Makefile.inc Fri Sep 20 18:59:30 2013
@@ -17,7 +17,7 @@
# under the License.
#
-BOOK=$(wildcard *Book.xml Programming-In-Apache-Qpid.xml)
+BOOK=$(wildcard *Book.xml Programming-In-Apache-Qpid.xml JMS-Performance-Test-Framework.xml)
XML=$(wildcard *.xml) $(wildcard ../common/*.xml)
IMAGES=$(wildcard images/*.png)
CSS=$(wilcard ../common/css/*.css)
Modified: qpid/branches/linearstore/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml (original)
+++ qpid/branches/linearstore/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml Fri Sep 20 18:59:30 2013
@@ -113,12 +113,12 @@ under the License.
been replicated to a backup, then it doesn't need to be replicated.
</para>
<variablelist>
- <title>Status of a HA broker</title>
+ <title>HA Broker States</title>
<varlistentry>
<term>Joining</term>
<listitem>
<para>
- Initial status of a new broker that has not yet connected to the primary.
+ Initial state of a new broker that has not yet connected to the primary.
</para>
</listitem>
</varlistentry>
@@ -299,7 +299,7 @@ ssl_addr = "ssl:" host [":" port]'
Specifies whether queues and exchanges are replicated by default.
<replaceable>VALUE</replaceable> is one of: <literal>none</literal>,
<literal>configuration</literal>, <literal>all</literal>.
- For details see <xref linkend="ha-creating-replicated"/>.
+ For details see <xref linkend="ha-replicate-values"/>.
</para>
</entry>
</row>
@@ -499,9 +499,8 @@ NOTE: fencing is not shown, you must con
<command>qpidd</command> broker to primary status.
</para>
<para>
- The <literal>resources</literal> section also defines a pair of virtual IP
- addresses on different sub-nets. One will be used for broker-to-broker
- communication, the other for client-to-broker.
+ The <literal>resources</literal> section also defines a virtual IP
+ address for clients.
</para>
<para>
To take advantage of the virtual IP addresses, <filename>qpidd.conf</filename>
Modified: qpid/branches/linearstore/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-REST-API.xml
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-REST-API.xml?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-REST-API.xml (original)
+++ qpid/branches/linearstore/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-REST-API.xml Fri Sep 20 18:59:30 2013
@@ -119,7 +119,7 @@
<row>
<entry>
<para>/rest/queue</para>
- <para>/rest/queue/<virtual host name>/>queue name></para>
+ <para>/rest/queue/<virtual host name>/<queue name></para>
</entry>
<entry>Rest service to manage queue(s)</entry>
<entry>Retrieves the details about the queue(s)</entry>
@@ -173,7 +173,7 @@
</row>
<row>
<entry>
- <para>/rest/message/*</para>
+ <para>/rest/message/<virtual host name>/<queue name></para>
</entry>
<entry>Rest service to manage messages(s)</entry>
<entry>Retrieves the details about the messages(s)</entry>
@@ -183,7 +183,7 @@
</row>
<row>
<entry>
- <para>/rest/message-content/*</para>
+ <para>/rest/message-content/<virtual host name>/<queue name></para>
</entry>
<entry>Rest service to retrieve message content</entry>
<entry>Retrieves the message content</entry>
@@ -275,22 +275,22 @@
all bindings for all queues for the given exchange in the virtual host.
</para>
<example>
- <title>Examples of queue creation using curl:</title>
+ <title>Examples of queue creation using curl (authenticating as user admin):</title>
<programlisting><![CDATA[
#create a durable queue
-curl -X PUT -d '{"durable":true}' http://localhost:8080/rest/queue/<vhostname>/<queuename>
+curl --user admin -X PUT -d '{"durable":true}' http://localhost:8080/rest/queue/<vhostname>/<queuename>
#create a durable priority queue
-curl -X PUT -d '{"durable":true,"type":"priority"}' http://localhost:8080/rest/queue/<vhostname>/<queuename>
+curl --user admin -X PUT -d '{"durable":true,"type":"priority"}' http://localhost:8080/rest/queue/<vhostname>/<queuename>
]]></programlisting>
</example><example>
<title>Example of binding a queue to an exchange using curl</title>
<programlisting><![CDATA[
-curl -X PUT -d '{}' http://localhost:8080/rest/binding/<vhostname>/<exchangename>/<queue-name>/<binding-name>
+curl --user admin -X PUT -d '{}' http://localhost:8080/rest/binding/<vhostname>/<exchangename>/<queue-name>/<binding-name>
]]></programlisting>
</example>
<para>
- NOTE: the above examples were performed after editing the
- <link linkend="Java-Broker-Configuring-And-Managing-HTTP-Management-Plugin-Configuration">HTTP Management Plugin Configuration</link>
- to enable HTTP Basic Authentication on connections not using SSL (i.e HTTPS).
+ NOTE: These curl examples utilise unsecure HTTP transport. To use the examples it is first necessary enable Basic
+ authentication for HTTP within the HTTP Management Configuration (it is off by default).
+ For details see <xref linkend="Java-Broker-Configuring-And-Managing-HTTP-Management-Plugin-Configuration"/>
</para>
</section>
Modified: qpid/branches/linearstore/qpid/doc/book/src/java-broker/commonEntities.xml
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/doc/book/src/java-broker/commonEntities.xml?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/doc/book/src/java-broker/commonEntities.xml (original)
+++ qpid/branches/linearstore/qpid/doc/book/src/java-broker/commonEntities.xml Fri Sep 20 18:59:30 2013
@@ -23,7 +23,7 @@
<!ENTITY qpidProgrammingBook "../../Programming-In-Apache-Qpid/html/">
<!ENTITY qpidCppBook "../../AMQP-Messaging-Broker-CPP-Book/html/">
-<!ENTITY qpidCurrentRelease "0.23">
+<!ENTITY qpidCurrentRelease "0.25">
<!ENTITY windowsBrokerDownloadFileName "qpid-java-broker-&qpidCurrentRelease;.zip">
<!ENTITY windowsExtractedBrokerDirName "qpid-broker-&qpidCurrentRelease;">
@@ -40,5 +40,5 @@
<!ENTITY oracleBdbProductOverviewUrl "http://www.oracle.com/technetwork/products/berkeleydb/overview/index-093405.html">
<!ENTITY oracleBdbRepGuideUrl "http://oracle.com/cd/E17277_02/html/ReplicationGuide/">
<!ENTITY oracleBdbJavaDocUrl "http://docs.oracle.com/cd/E17277_02/html/java/">
-<!ENTITY oracleBdbProductVersion "5.0.73">
+<!ENTITY oracleBdbProductVersion "5.0.84">
Modified: qpid/branches/linearstore/qpid/extras/dispatch/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/CMakeLists.txt?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/CMakeLists.txt (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/CMakeLists.txt Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
## to you under the Apache License, Version 2.0 (the
## "License"); you may not use this file except in compliance
## with the License. You may obtain a copy of the License at
-##
+##
## http://www.apache.org/licenses/LICENSE-2.0
-##
+##
## Unless required by applicable law or agreed to in writing,
## software distributed under the License is distributed on an
## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@ include(CheckLibraryExists)
include(CheckSymbolExists)
include(CheckFunctionExists)
include(CheckIncludeFiles)
+include(FindPythonInterp)
include(FindPythonLibs)
enable_testing()
@@ -52,6 +53,14 @@ set(SYSCONF_INSTALL_DIR etc CACHE PATH "
set(SHARE_INSTALL_DIR share CACHE PATH "Shared read only data directory")
set(MAN_INSTALL_DIR share/man CACHE PATH "Manpage directory")
+# determine the location for installing the python packages
+if (PYTHONLIBS_FOUND)
+ execute_process(COMMAND ${PYTHON_EXECUTABLE}
+ -c "from distutils.sysconfig import get_python_lib; print get_python_lib(False)"
+ OUTPUT_VARIABLE PYTHON_SITELIB_PACKAGES
+ OUTPUT_STRIP_TRAILING_WHITESPACE)
+endif (PYTHONLIBS_FOUND)
+
include_directories(
${CMAKE_CURRENT_SOURCE_DIR}/include
${CMAKE_CURRENT_SOURCE_DIR}/src
@@ -107,7 +116,53 @@ install(TARGETS qpid-dispatch
file(GLOB headers "include/qpid/dispatch/*.h")
install(FILES ${headers} DESTINATION ${INCLUDE_INSTALL_DIR}/qpid/dispatch)
install(FILES include/qpid/dispatch.h DESTINATION ${INCLUDE_INSTALL_DIR}/qpid)
+install(FILES etc/qpid-dispatch.conf DESTINATION ${SYSCONF_INSTALL_DIR})
+
+##
+## Python modules installation
+##
+set(PYTHON_STUBS_SOURCES
+ python/qpid/dispatch/stubs/__init__.py
+ python/qpid/dispatch/stubs/ioadapter.py
+ python/qpid/dispatch/stubs/logadapter.py
+)
+
+set(PYTHON_ROUTER_SOURCES
+ python/qpid/dispatch/router/link.py
+ python/qpid/dispatch/router/router_engine.py
+ python/qpid/dispatch/router/__init__.py
+ python/qpid/dispatch/router/adapter.py
+ python/qpid/dispatch/router/mobile.py
+ python/qpid/dispatch/router/node.py
+ python/qpid/dispatch/router/routing.py
+ python/qpid/dispatch/router/data.py
+ python/qpid/dispatch/router/configuration.py
+ python/qpid/dispatch/router/neighbor.py
+ python/qpid/dispatch/router/path.py
+ python/qpid/dispatch/router/binding.py
+)
+
+set(PYTHON_CONFIG_SOURCES
+ python/qpid/dispatch/config/parser.py
+ python/qpid/dispatch/config/__init__.py
+ python/qpid/dispatch/config/schema.py
+ python/qpid/dispatch/__init__.py
+)
+
+install(FILES ${PYTHON_STUBS_SOURCES}
+ DESTINATION ${PYTHON_SITELIB_PACKAGES}/qpid/dispatch/stubs)
+
+install(FILES ${PYTHON_ROUTER_SOURCES}
+ DESTINATION ${PYTHON_SITELIB_PACKAGES}/qpid/dispatch/router)
+
+install(FILES ${PYTHON_CONFIG_SOURCES}
+ DESTINATION ${PYTHON_SITELIB_PACKAGES}/qpid/dispatch/config)
+
+install(FILES python/qpid/__init__.py
+ DESTINATION ${PYTHON_SITELIB_PACKAGES}/qpid)
+install(FILES python/qpid/dispatch/__init__.py
+ DESTINATION ${PYTHON_SITELIB_PACKAGES}/qpid/dispatch)
##
## Build Tests
##
Modified: qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/container.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/container.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/container.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/container.h Fri Sep 20 18:59:30 2013
@@ -51,10 +51,11 @@ typedef enum {
} dx_direction_t;
-typedef struct dx_node_t dx_node_t;
-typedef struct dx_link_t dx_link_t;
+typedef struct dx_node_t dx_node_t;
+typedef struct dx_link_t dx_link_t;
+typedef struct dx_delivery_t dx_delivery_t;
-typedef void (*dx_container_delivery_handler_t) (void *node_context, dx_link_t *link, pn_delivery_t *delivery);
+typedef void (*dx_container_delivery_handler_t) (void *node_context, dx_link_t *link, dx_delivery_t *delivery);
typedef int (*dx_container_link_handler_t) (void *node_context, dx_link_t *link);
typedef int (*dx_container_link_detach_handler_t) (void *node_context, dx_link_t *link, int closed);
typedef void (*dx_container_node_handler_t) (void *type_context, dx_node_t *node);
@@ -65,33 +66,78 @@ typedef struct {
void *type_context;
int allow_dynamic_creation;
- //
+ //=======================
// Node-Instance Handlers
+ //=======================
+
+ //
+ // rx_handler - Invoked when a new received delivery is avaliable for processing.
+ //
+ dx_container_delivery_handler_t rx_handler;
+
+ //
+ // disp_handler - Invoked when an existing delivery changes disposition
+ // or settlement state.
+ //
+ dx_container_delivery_handler_t disp_handler;
+
+ //
+ // incoming_handler - Invoked when an attach for a new incoming link is received.
+ //
+ dx_container_link_handler_t incoming_handler;
+
+ //
+ // outgoing_handler - Invoked when an attach for a new outgoing link is received.
+ //
+ dx_container_link_handler_t outgoing_handler;
+
+ //
+ // writable_handler - Invoked when an outgoing link is available for sending either
+ // deliveries or disposition changes. The handler must check the
+ // link's credit to determine whether (and how many) message
+ // deliveries may be sent.
//
- dx_container_delivery_handler_t rx_handler;
- dx_container_delivery_handler_t tx_handler;
- dx_container_delivery_handler_t disp_handler;
- dx_container_link_handler_t incoming_handler;
- dx_container_link_handler_t outgoing_handler;
- dx_container_link_handler_t writable_handler;
- dx_container_link_detach_handler_t link_detach_handler;
+ dx_container_link_handler_t writable_handler;
//
+ // link_detach_handler - Invoked when a link is detached.
+ //
+ dx_container_link_detach_handler_t link_detach_handler;
+
+ //===================
// Node-Type Handlers
+ //===================
+
+ //
+ // node_created_handler - Invoked when a new instance of the node-type is created.
//
dx_container_node_handler_t node_created_handler;
+
+ //
+ // node_destroyed_handler - Invoked when an instance of the node type is destroyed.
+ //
dx_container_node_handler_t node_destroyed_handler;
+
+ //
+ // inbound_conn_open_handler - Invoked when an incoming connection (via listener)
+ // is established.
+ //
dx_container_conn_handler_t inbound_conn_open_handler;
+
+ //
+ // outbound_conn_open_handler - Invoked when an outgoing connection (via connector)
+ // is established.
+ //
dx_container_conn_handler_t outbound_conn_open_handler;
} dx_node_type_t;
int dx_container_register_node_type(dx_dispatch_t *dispatch, const dx_node_type_t *nt);
-void dx_container_set_default_node_type(dx_dispatch_t *dispatch,
- const dx_node_type_t *nt,
- void *node_context,
- dx_dist_mode_t supported_dist);
+dx_node_t *dx_container_set_default_node_type(dx_dispatch_t *dispatch,
+ const dx_node_type_t *nt,
+ void *node_context,
+ dx_dist_mode_t supported_dist);
dx_node_t *dx_container_create_node(dx_dispatch_t *dispatch,
const dx_node_type_t *nt,
@@ -116,6 +162,26 @@ pn_terminus_t *dx_link_remote_target(dx_
void dx_link_activate(dx_link_t *link);
void dx_link_close(dx_link_t *link);
+/**
+ * Important: dx_delivery must never be called twice in a row without an intervening pn_link_advance.
+ * The Disatch architecture provides a hook for discovering when an outgoing link is writable
+ * and has credit. When a link is writable, a delivery is allocated, written, and advanced
+ * in one operation. If a backlog of pending deliveries is created, an assertion will be
+ * thrown.
+ */
+dx_delivery_t *dx_delivery(dx_link_t *link, pn_delivery_tag_t tag);
+void dx_delivery_free(dx_delivery_t *delivery, uint64_t final_disposition);
+void dx_delivery_set_peer(dx_delivery_t *delivery, dx_delivery_t *peer);
+dx_delivery_t *dx_delivery_peer(dx_delivery_t *delivery);
+void dx_delivery_set_context(dx_delivery_t *delivery, void *context);
+void *dx_delivery_context(dx_delivery_t *delivery);
+pn_delivery_t *dx_delivery_pn(dx_delivery_t *delivery);
+void dx_delivery_settle(dx_delivery_t *delivery);
+bool dx_delivery_settled(dx_delivery_t *delivery);
+bool dx_delivery_disp_changed(dx_delivery_t *delivery);
+uint64_t dx_delivery_disp(dx_delivery_t *delivery);
+dx_link_t *dx_delivery_link(dx_delivery_t *delivery);
+
typedef struct dx_link_item_t dx_link_item_t;
Modified: qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/message.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/message.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/message.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/message.h Fri Sep 20 18:59:30 2013
@@ -19,12 +19,13 @@
* under the License.
*/
-#include <proton/engine.h>
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/alloc.h>
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/buffer.h>
#include <qpid/dispatch/compose.h>
+#include <qpid/dispatch/parse.h>
+#include <qpid/dispatch/container.h>
// Callback for status change (confirmed persistent, loaded-in-memory, etc.)
@@ -88,34 +89,94 @@ typedef enum {
DX_FIELD_REPLY_TO_GROUP_ID
} dx_message_field_t;
-//
-// Functions for allocation
-//
+
+/**
+ * Allocate a new message.
+ *
+ * @return A pointer to a dx_message_t that is the sole reference to a newly allocated
+ * message.
+ */
dx_message_t *dx_allocate_message(void);
-void dx_free_message(dx_message_t *qm);
-dx_message_t *dx_message_copy(dx_message_t *qm);
-int dx_message_persistent(dx_message_t *qm);
-int dx_message_in_memory(dx_message_t *qm);
-
-void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery);
-pn_delivery_t *dx_message_out_delivery(dx_message_t *msg);
-void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery);
-pn_delivery_t *dx_message_in_delivery(dx_message_t *msg);
-//
-// Functions for received messages
-//
-dx_message_t *dx_message_receive(pn_delivery_t *delivery);
-void dx_message_send(dx_message_t *msg, pn_link_t *link);
+/**
+ * Free a message reference. If this is the last reference to the message, free the
+ * message as well.
+ *
+ * @param msg A pointer to a dx_message_t that is no longer needed.
+ */
+void dx_free_message(dx_message_t *msg);
+
+/**
+ * Make a new reference to an existing message.
+ *
+ * @param msg A pointer to a dx_message_t referencing a message.
+ * @return A new pointer to the same referenced message.
+ */
+dx_message_t *dx_message_copy(dx_message_t *msg);
+
+/**
+ * Retrieve the delivery annotations from a message.
+ *
+ * IMPORTANT: The pointer returned by this function remains owned by the message.
+ * The caller MUST NOT free the parsed field.
+ *
+ * @param msg Pointer to a received message.
+ * @return Pointer to the parsed field for the delivery annotations. If the message doesn't
+ * have delivery annotations, the return value shall be NULL.
+ */
+dx_parsed_field_t *dx_message_delivery_annotations(dx_message_t *msg);
+/**
+ * Set the delivery annotations for the message. If the message already has delivery annotations,
+ * they will be overwritten/replaced by the new field.
+ *
+ * @param msg Pointer to a receiver message.
+ * @param da Pointer to a composed field representing the new delivery annotations of the message.
+ * If null, the message will not have a delivery annotations field.
+ * IMPORTANT: The message will not take ownership of the composed field. The
+ * caller is responsible for freeing it after this call. Since the contents
+ * are copied into the message, it is safe to free the composed field
+ * any time after the call to this function.
+ */
+void dx_message_set_delivery_annotations(dx_message_t *msg, dx_composed_field_t *da);
+
+/**
+ * Receive message data via a delivery. This function may be called more than once on the same
+ * delivery if the message spans multiple frames. Once a complete message has been received, this
+ * function shall return the message.
+ *
+ * @param delivery An incoming delivery from a link
+ * @return A pointer to the complete message or 0 if the message is not yet complete.
+ */
+dx_message_t *dx_message_receive(dx_delivery_t *delivery);
+
+/**
+ * Send the message outbound on an outgoing link.
+ *
+ * @param msg A pointer to a message to be sent.
+ * @param link The outgoing link on which to send the message.
+ */
+void dx_message_send(dx_message_t *msg, dx_link_t *link);
+
+/**
+ * Check that the message is well-formed up to a certain depth. Any part of the message that is
+ * beyond the specified depth is not checked for validity.
+ */
int dx_message_check(dx_message_t *msg, dx_message_depth_t depth);
+
+/**
+ * Return an iterator for the requested message field. If the field is not in the message,
+ * return NULL.
+ *
+ * @param msg A pointer to a message.
+ * @param field The field to be returned via iterator.
+ * @return A field iterator that spans the requested field.
+ */
dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field);
ssize_t dx_message_field_length(dx_message_t *msg, dx_message_field_t field);
ssize_t dx_message_field_copy(dx_message_t *msg, dx_message_field_t field, void *buffer);
-pn_delivery_t *dx_message_inbound_delivery(dx_message_t *qm);
-
//
// Functions for composed messages
//
Modified: qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/router.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/router.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/router.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/router.h Fri Sep 20 18:59:30 2013
@@ -29,9 +29,9 @@ typedef struct dx_address_t dx_address_t
typedef void (*dx_router_message_cb)(void *context, dx_message_t *msg);
+const char *dx_router_id(const dx_dispatch_t *dx);
dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
- bool is_local,
const char *address,
dx_router_message_cb handler,
void *context);
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -262,7 +262,7 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx)
DEQ_INIT(agent->out_fifo);
agent->lock = sys_mutex();
agent->timer = dx_timer(dx, dx_agent_deferred_handler, agent);
- agent->address = dx_router_register_address(dx, true, "agent", dx_agent_rx_handler, agent);
+ agent->address = dx_router_register_address(dx, "agent", dx_agent_rx_handler, agent);
return agent;
}
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/alloc.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/alloc.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/alloc.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/alloc.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,24 +24,34 @@
#include <memory.h>
#include <stdio.h>
-typedef struct item_t item_t;
+typedef struct dx_alloc_type_t dx_alloc_type_t;
+typedef struct dx_alloc_item_t dx_alloc_item_t;
-struct item_t {
- DEQ_LINKS(item_t);
+struct dx_alloc_type_t {
+ DEQ_LINKS(dx_alloc_type_t);
dx_alloc_type_desc_t *desc;
};
-DEQ_DECLARE(item_t, item_list_t);
+DEQ_DECLARE(dx_alloc_type_t, dx_alloc_type_list_t);
+
+
+struct dx_alloc_item_t {
+ DEQ_LINKS(dx_alloc_item_t);
+};
+
+DEQ_DECLARE(dx_alloc_item_t, dx_alloc_item_list_t);
+
struct dx_alloc_pool_t {
- item_list_t free_list;
+ dx_alloc_item_list_t free_list;
};
dx_alloc_config_t dx_alloc_default_config_big = {16, 32, 0};
dx_alloc_config_t dx_alloc_default_config_small = {64, 128, 0};
+#define BIG_THRESHOLD 256
-static sys_mutex_t *init_lock;
-static item_list_t type_list;
+static sys_mutex_t *init_lock;
+static dx_alloc_type_list_t type_list;
static void dx_alloc_init(dx_alloc_type_desc_t *desc)
{
@@ -56,7 +66,7 @@ static void dx_alloc_init(dx_alloc_type_
if (!desc->global_pool) {
if (desc->config == 0)
- desc->config = desc->total_size > 256 ?
+ desc->config = desc->total_size > BIG_THRESHOLD ?
&dx_alloc_default_config_big : &dx_alloc_default_config_small;
assert (desc->config->local_free_list_max >= desc->config->transfer_batch_size);
@@ -66,12 +76,12 @@ static void dx_alloc_init(dx_alloc_type_
desc->lock = sys_mutex();
desc->stats = NEW(dx_alloc_stats_t);
memset(desc->stats, 0, sizeof(dx_alloc_stats_t));
- }
- item_t *type_item = NEW(item_t);
- DEQ_ITEM_INIT(type_item);
- type_item->desc = desc;
- DEQ_INSERT_TAIL(type_list, type_item);
+ dx_alloc_type_t *type_item = NEW(dx_alloc_type_t);
+ DEQ_ITEM_INIT(type_item);
+ type_item->desc = desc;
+ DEQ_INSERT_TAIL(type_list, type_item);
+ }
sys_mutex_unlock(init_lock);
}
@@ -103,7 +113,7 @@ void *dx_alloc(dx_alloc_type_desc_t *des
// list and return it. Since everything we've touched is thread-local,
// there is no need to acquire a lock.
//
- item_t *item = DEQ_HEAD(pool->free_list);
+ dx_alloc_item_t *item = DEQ_HEAD(pool->free_list);
if (item) {
DEQ_REMOVE_HEAD(pool->free_list);
return &item[1];
@@ -130,11 +140,10 @@ void *dx_alloc(dx_alloc_type_desc_t *des
// Allocate a full batch from the heap and put it on the thread list.
//
for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
- item = (item_t*) malloc(sizeof(item_t) + desc->total_size);
+ item = (dx_alloc_item_t*) malloc(sizeof(dx_alloc_item_t) + desc->total_size);
if (item == 0)
break;
DEQ_ITEM_INIT(item);
- item->desc = desc;
DEQ_INSERT_TAIL(pool->free_list, item);
desc->stats->held_by_threads++;
desc->stats->total_alloc_from_heap++;
@@ -154,7 +163,7 @@ void *dx_alloc(dx_alloc_type_desc_t *des
void dx_dealloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool, void *p)
{
- item_t *item = ((item_t*) p) - 1;
+ dx_alloc_item_t *item = ((dx_alloc_item_t*) p) - 1;
int idx;
//
@@ -217,7 +226,7 @@ static void alloc_schema_handler(void *c
static void alloc_query_handler(void* context, const char *id, void *cor)
{
- item_t *item = DEQ_HEAD(type_list);
+ dx_alloc_type_t *item = DEQ_HEAD(type_list);
while (item) {
dx_agent_value_string(cor, "name", item->desc->type_name);
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/alloc_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/alloc_private.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/alloc_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/alloc_private.h Fri Sep 20 18:59:30 2013
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,30 +21,10 @@
#include <qpid/dispatch/alloc.h>
#include <qpid/dispatch/buffer.h>
#include <qpid/dispatch/amqp.h>
-#include "message_private.h"
#include "compose_private.h"
#include <memory.h>
-typedef struct dx_composite_t {
- DEQ_LINKS(struct dx_composite_t);
- int isMap;
- uint32_t count;
- uint32_t length;
- dx_field_location_t length_location;
- dx_field_location_t count_location;
-} dx_composite_t;
-
-ALLOC_DECLARE(dx_composite_t);
ALLOC_DEFINE(dx_composite_t);
-DEQ_DECLARE(dx_composite_t, dx_field_stack_t);
-
-
-struct dx_composed_field_t {
- dx_buffer_list_t buffers;
- dx_field_stack_t fieldStack;
-};
-
-ALLOC_DECLARE(dx_composed_field_t);
ALLOC_DEFINE(dx_composed_field_t);
@@ -197,7 +177,7 @@ static void dx_compose_end_composite(dx_
//
dx_composite_t *enclosing = DEQ_HEAD(field->fieldStack);
if (enclosing) {
- enclosing->length += 4 + comp->length;
+ enclosing->length += (comp->length - 4); // the length and count were already accounted for
enclosing->count++;
}
@@ -233,12 +213,14 @@ void dx_compose_free(dx_composed_field_t
while (buf) {
DEQ_REMOVE_HEAD(field->buffers);
dx_free_buffer(buf);
+ buf = DEQ_HEAD(field->buffers);
}
dx_composite_t *comp = DEQ_HEAD(field->fieldStack);
while (comp) {
DEQ_REMOVE_HEAD(field->fieldStack);
free_dx_composite_t(comp);
+ comp = DEQ_HEAD(field->fieldStack);
}
free_dx_composed_field_t(field);
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/compose_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/compose_private.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/compose_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/compose_private.h Fri Sep 20 18:59:30 2013
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,7 +20,28 @@
*/
#include <qpid/dispatch/compose.h>
+#include "message_private.h"
dx_buffer_list_t *dx_compose_buffers(dx_composed_field_t *field);
+typedef struct dx_composite_t {
+ DEQ_LINKS(struct dx_composite_t);
+ int isMap;
+ uint32_t count;
+ uint32_t length;
+ dx_field_location_t length_location;
+ dx_field_location_t count_location;
+} dx_composite_t;
+
+ALLOC_DECLARE(dx_composite_t);
+DEQ_DECLARE(dx_composite_t, dx_field_stack_t);
+
+
+struct dx_composed_field_t {
+ dx_buffer_list_t buffers;
+ dx_field_stack_t fieldStack;
+};
+
+ALLOC_DECLARE(dx_composed_field_t);
+
#endif
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/config.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/config.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/config.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/config.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,7 +22,7 @@
#include <qpid/dispatch/alloc.h>
#include <qpid/dispatch/log.h>
-#define PYTHON_MODULE "config"
+#define PYTHON_MODULE "qpid.dispatch.config"
static const char *log_module = "CONFIG";
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/config_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/config_private.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/config_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/config_private.h Fri Sep 20 18:59:30 2013
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/container.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/container.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/container.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/container.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -47,6 +47,7 @@ ALLOC_DECLARE(dx_node_t);
ALLOC_DEFINE(dx_node_t);
ALLOC_DEFINE(dx_link_item_t);
+
struct dx_link_t {
pn_link_t *pn_link;
void *context;
@@ -56,6 +57,19 @@ struct dx_link_t {
ALLOC_DECLARE(dx_link_t);
ALLOC_DEFINE(dx_link_t);
+
+struct dx_delivery_t {
+ pn_delivery_t *pn_delivery;
+ dx_delivery_t *peer;
+ void *context;
+ uint64_t disposition;
+ dx_link_t *link;
+};
+
+ALLOC_DECLARE(dx_delivery_t);
+ALLOC_DEFINE(dx_delivery_t);
+
+
typedef struct dxc_node_type_t {
DEQ_LINKS(struct dxc_node_type_t);
const dx_node_type_t *ntype;
@@ -180,14 +194,25 @@ static int do_writable(pn_link_t *pn_lin
}
-static void process_receive(pn_delivery_t *delivery)
+static void do_receive(pn_delivery_t *pnd)
{
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ pn_link_t *pn_link = pn_delivery_link(pnd);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ dx_delivery_t *delivery = (dx_delivery_t*) pn_delivery_get_context(pnd);
if (link) {
dx_node_t *node = link->node;
if (node) {
+ if (!delivery) {
+ delivery = new_dx_delivery_t();
+ delivery->pn_delivery = pnd;
+ delivery->peer = 0;
+ delivery->context = 0;
+ delivery->disposition = 0;
+ delivery->link = link;
+ pn_delivery_set_context(pnd, delivery);
+ }
+
node->ntype->rx_handler(node->context, link, delivery);
return;
}
@@ -198,34 +223,18 @@ static void process_receive(pn_delivery_
//
pn_link_advance(pn_link);
pn_link_flow(pn_link, 1);
- pn_delivery_update(delivery, PN_REJECTED);
- pn_delivery_settle(delivery);
-}
-
-
-static void do_send(pn_delivery_t *delivery)
-{
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
-
- if (link) {
- dx_node_t *node = link->node;
- if (node) {
- node->ntype->tx_handler(node->context, link, delivery);
- return;
- }
- }
-
- // TODO - Cancel the delivery
+ pn_delivery_update(pnd, PN_REJECTED);
+ pn_delivery_settle(pnd);
}
-static void do_updated(pn_delivery_t *delivery)
+static void do_updated(pn_delivery_t *pnd)
{
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ pn_link_t *pn_link = pn_delivery_link(pnd);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ dx_delivery_t *delivery = (dx_delivery_t*) pn_delivery_get_context(pnd);
- if (link) {
+ if (link && delivery) {
dx_node_t *node = link->node;
if (node)
node->ntype->disp_handler(node->context, link, delivery);
@@ -239,15 +248,15 @@ static int close_handler(void* unused, p
// Close all links, passing False as the 'closed' argument. These links are not
// being properly 'detached'. They are being orphaned.
//
- pn_link_t *pn_link = pn_link_head(conn, 0);
+ pn_link_t *pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE);
while (pn_link) {
dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
dx_node_t *node = link->node;
- if (node)
+ if (node && link)
node->ntype->link_detach_handler(node->context, link, 0);
pn_link_close(pn_link);
free_dx_link_t(link);
- pn_link = pn_link_next(pn_link, 0);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE);
}
// teardown all sessions
@@ -305,9 +314,7 @@ static int process_handler(dx_container_
delivery = pn_work_head(conn);
while (delivery) {
if (pn_delivery_readable(delivery))
- process_receive(delivery);
- else if (pn_delivery_writable(delivery))
- do_send(delivery);
+ do_receive(delivery);
if (pn_delivery_updated(delivery)) {
do_updated(delivery);
@@ -318,15 +325,15 @@ static int process_handler(dx_container_
}
//
- // Step 2.5: Traverse all of the links on the connection looking for
- // outgoing links with non-zero credit. Call the attached node's
- // writable handler for such links.
+ // Step 2.5: Call the attached node's writable handler for all active links
+ // on the connection. Note that in Dispatch, links are considered
+ // bidirectional. Incoming and outgoing only pertains to deliveries and
+ // deliveries are a subset of the traffic that flows both directions on links.
//
pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
while (pn_link) {
assert(pn_session_connection(pn_link_session(pn_link)) == conn);
- if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0)
- event_count += do_writable(pn_link);
+ event_count += do_writable(pn_link);
pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
@@ -513,10 +520,10 @@ int dx_container_register_node_type(dx_d
}
-void dx_container_set_default_node_type(dx_dispatch_t *dx,
- const dx_node_type_t *nt,
- void *context,
- dx_dist_mode_t supported_dist)
+dx_node_t *dx_container_set_default_node_type(dx_dispatch_t *dx,
+ const dx_node_type_t *nt,
+ void *context,
+ dx_dist_mode_t supported_dist)
{
dx_container_t *container = dx->container;
@@ -530,6 +537,8 @@ void dx_container_set_default_node_type(
container->default_node = 0;
dx_log(module, LOG_TRACE, "Default node removed");
}
+
+ return container->default_node;
}
@@ -699,3 +708,110 @@ void dx_link_close(dx_link_t *link)
}
+dx_delivery_t *dx_delivery(dx_link_t *link, pn_delivery_tag_t tag)
+{
+ pn_link_t *pnl = dx_link_pn(link);
+
+ //
+ // If there is a current delivery on this outgoing link, something
+ // is wrong with the delivey algorithm. We assume that the current
+ // delivery ('pnd' below) is the one created by pn_delivery. If it is
+ // not, then my understanding of how proton works is incorrect.
+ //
+ assert(!pn_link_current(pnl));
+
+ pn_delivery(pnl, tag);
+ pn_delivery_t *pnd = pn_link_current(pnl);
+
+ if (!pnd)
+ return 0;
+
+ dx_delivery_t *delivery = new_dx_delivery_t();
+ delivery->pn_delivery = pnd;
+ delivery->peer = 0;
+ delivery->context = 0;
+ delivery->disposition = 0;
+ delivery->link = link;
+ pn_delivery_set_context(pnd, delivery);
+
+ return delivery;
+}
+
+
+void dx_delivery_free(dx_delivery_t *delivery, uint64_t final_disposition)
+{
+ if (delivery->pn_delivery) {
+ if (final_disposition > 0)
+ pn_delivery_update(delivery->pn_delivery, final_disposition);
+ pn_delivery_set_context(delivery->pn_delivery, 0);
+ pn_delivery_settle(delivery->pn_delivery);
+ }
+ if (delivery->peer)
+ delivery->peer->peer = 0;
+ free_dx_delivery_t(delivery);
+}
+
+
+void dx_delivery_set_peer(dx_delivery_t *delivery, dx_delivery_t *peer)
+{
+ delivery->peer = peer;
+}
+
+
+void dx_delivery_set_context(dx_delivery_t *delivery, void *context)
+{
+ delivery->context = context;
+}
+
+
+void *dx_delivery_context(dx_delivery_t *delivery)
+{
+ return delivery->context;
+}
+
+
+dx_delivery_t *dx_delivery_peer(dx_delivery_t *delivery)
+{
+ return delivery->peer;
+}
+
+
+pn_delivery_t *dx_delivery_pn(dx_delivery_t *delivery)
+{
+ return delivery->pn_delivery;
+}
+
+
+void dx_delivery_settle(dx_delivery_t *delivery)
+{
+ if (delivery->pn_delivery) {
+ pn_delivery_settle(delivery->pn_delivery);
+ delivery->pn_delivery = 0;
+ }
+}
+
+
+bool dx_delivery_settled(dx_delivery_t *delivery)
+{
+ return pn_delivery_settled(delivery->pn_delivery);
+}
+
+
+bool dx_delivery_disp_changed(dx_delivery_t *delivery)
+{
+ return delivery->disposition != pn_delivery_remote_state(delivery->pn_delivery);
+}
+
+
+uint64_t dx_delivery_disp(dx_delivery_t *delivery)
+{
+ delivery->disposition = pn_delivery_remote_state(delivery->pn_delivery);
+ return delivery->disposition;
+}
+
+
+dx_link_t *dx_delivery_link(dx_delivery_t *delivery)
+{
+ return delivery->link;
+}
+
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch_private.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch_private.h Fri Sep 20 18:59:30 2013
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,7 +38,7 @@ DEQ_DECLARE(hash_item_t, items_t);
typedef struct bucket_t {
- items_t items;
+ items_t items;
} bucket_t;
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/iovec.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/iovec.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/iovec.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/iovec.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/iterator.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/iterator.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/iterator.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/iterator.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -367,7 +367,7 @@ dx_field_iterator_t *dx_field_iterator_s
void dx_field_iterator_advance(dx_field_iterator_t *iter, uint32_t length)
{
// TODO - Make this more efficient.
- for (uint8_t idx = 0; idx < length; idx++)
+ for (uint8_t idx = 0; idx < length && !dx_field_iterator_end(iter); idx++)
dx_field_iterator_octet(iter);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org