You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC

svn commit: r1525101 [9/21] - in /qpid/branches/linearstore/qpid: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/...

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/interlink_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/interlink_tests.py?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/interlink_tests.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/interlink_tests.py Fri Sep 20 18:59:30 2013
@@ -22,6 +22,7 @@ import os, signal, sys, time, imp, re, s
 import traceback
 from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
 from brokertest import *
+from ha_test import HaPort
 from threading import Thread, Lock, Condition
 from logging import getLogger, WARN, ERROR, DEBUG, INFO
 from qpidtoollibs import BrokerAgent, BrokerObject
@@ -46,7 +47,8 @@ class AmqpBrokerTest(BrokerTest):
     def setUp(self):
         BrokerTest.setUp(self)
         os.putenv("QPID_LOAD_MODULE", BrokerTest.amqpc_lib)
-        self.broker = self.amqp_broker()
+        self.port_holder = HaPort(self)
+        self.broker = self.amqp_broker(port_holder=self.port_holder)
         self.default_config = Config(self.broker)
         self.agent = BrokerAgent(self.broker.connect())
 
@@ -126,6 +128,9 @@ class AmqpBrokerTest(BrokerTest):
     def test_translate2(self):
         self.send_and_receive(send_config=Config(self.broker, version="amqp0-10"))
 
+    def test_translate_with_large_routingkey(self):
+        self.send_and_receive(send_config=Config(self.broker, address="amq.topic/a.%s" % ("x" * 256), version="amqp1.0"), recv_config=Config(self.broker, address="amq.topic/a.*", version="amqp0-10"), wait_for_receiver=True)
+
     def send_and_receive_empty(self, send_config=None, recv_config=None):
         sconfig = send_config or self.default_config
         rconfig = recv_config or self.default_config
@@ -218,16 +223,22 @@ class AmqpBrokerTest(BrokerTest):
         assert len(domains) == 1
         assert domains[0].name == "BrokerB"
 
-    def test_incoming_link(self):
+    def incoming_link(self, mechanism):
         brokerB = self.amqp_broker()
         agentB = BrokerAgent(brokerB.connect())
         self.agent.create("queue", "q")
         agentB.create("queue", "q")
-        self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"})
+        self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":mechanism})
         self.agent.create("incoming", "Link1", {"domain":"BrokerB","source":"q","target":"q"})
         #send to brokerB, receive from brokerA
         self.send_and_receive(send_config=Config(brokerB))
 
+    def test_incoming_link_anonymous(self):
+        self.incoming_link("ANONYMOUS")
+
+    def test_incoming_link_nosasl(self):
+        self.incoming_link("NONE")
+
     def test_outgoing_link(self):
         brokerB = self.amqp_broker()
         agentB = BrokerAgent(brokerB.connect())
@@ -246,14 +257,73 @@ class AmqpBrokerTest(BrokerTest):
         #send to q on broker B through brokerA
         self.send_and_receive(send_config=Config(self.broker, address="q@BrokerB"), recv_config=Config(brokerB))
 
+    def test_reconnect(self):
+        receiver_cmd = ["qpid-receive",
+               "--broker", self.broker.host_port(),
+               "--address=amq.fanout",
+               "--connection-options={protocol:amqp1.0, reconnect:True,container_id:receiver}",
+               "--timeout=10", "--print-content=true", "--print-headers=false"
+               ]
+        receiver = self.popen(receiver_cmd, stdout=PIPE)
+
+        sender_cmd = ["qpid-send",
+               "--broker", self.broker.host_port(),
+               "--address=amq.fanout",
+               "--connection-options={protocol:amqp1.0,reconnect:True,container_id:sender}",
+               "--content-stdin", "--send-eos=1"
+               ]
+        sender = self.popen(sender_cmd, stdin=PIPE)
+        sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112
+
+
+        batch1 = ["message-%s" % (i+1) for i in range(10000)]
+        for m in batch1:
+            sender.stdin.write(m + "\n")
+            sender.stdin.flush()
+
+        self.broker.kill()
+        self.broker = self.amqp_broker(port_holder=self.port_holder)
+
+        batch2 = ["message-%s" % (i+1) for i in range(10000, 20000)]
+        for m in batch2:
+            sender.stdin.write(m + "\n")
+            sender.stdin.flush()
+
+        sender.stdin.close()
+
+        last = None
+        m = receiver.stdout.readline().rstrip()
+        while len(m):
+            last = m
+            m = receiver.stdout.readline().rstrip()
+        assert last == "message-20000", (last)
+
     """ Create and return a broker with AMQP 1.0 support """
     def amqp_broker(self):
         assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
+        self.port_holder = HaPort(self) #reserve port
         args = ["--load-module", BrokerTest.amqp_lib,
-                "--max-negotiate-time=600000",
+                "--socket-fd=%s" % self.port_holder.fileno,
+                "--listen-disable=tcp",
                 "--log-enable=trace+:Protocol",
                 "--log-enable=info+"]
-        return BrokerTest.broker(self, args)
+        return BrokerTest.broker(self, args, port=self.port_holder.port)
+
+    def amqp_broker(self, port_holder=None):
+        assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
+        if port_holder:
+            args = ["--load-module", BrokerTest.amqp_lib,
+                    "--socket-fd=%s" % port_holder.fileno,
+                    "--listen-disable=tcp",
+                    "--log-enable=trace+:Protocol",
+                    "--log-enable=info+"]
+            return BrokerTest.broker(self, args, port=port_holder.port)
+        else:
+            args = ["--load-module", BrokerTest.amqp_lib,
+                    "--log-enable=trace+:Protocol",
+                    "--log-enable=info+"]
+            return BrokerTest.broker(self, args)
+
 
 if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/qpid-receive.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/qpid-receive.cpp Fri Sep 20 18:59:30 2013
@@ -69,11 +69,12 @@ struct Options : public qpid::Options
     string readyAddress;
     uint receiveRate;
     std::string replyto;
+    bool noReplies;
 
     Options(const std::string& argv0=std::string())
         : qpid::Options("Options"),
           help(false),
-          url("amqp:tcp:127.0.0.1"),
+          url("127.0.0.1"),
           timeout(0),
           forever(false),
           messages(0),
@@ -91,7 +92,8 @@ struct Options : public qpid::Options
           reportTotal(false),
           reportEvery(0),
           reportHeader(true),
-          receiveRate(0)
+          receiveRate(0),
+          noReplies(false)
     {
         addOptions()
             ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -116,6 +118,7 @@ struct Options : public qpid::Options
             ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive")
             ("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.")
             ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address on response messages")
+            ("ignore-reply-to", qpid::optValue(noReplies), "Do not send replies even if reply-to is set")
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
     }
@@ -222,6 +225,7 @@ int main(int argc, char ** argv)
                         if (opts.printHeaders) {
                             if (msg.getSubject().size()) std::cout << "Subject: " << msg.getSubject() << std::endl;
                             if (msg.getReplyTo()) std::cout << "ReplyTo: " << msg.getReplyTo() << std::endl;
+                            if (msg.getMessageId().size()) std::cout << "MessageId: " << msg.getMessageId() << std::endl;
                             if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl;
                             if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl;
                             if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl;
@@ -231,8 +235,10 @@ int main(int argc, char ** argv)
                             std::cout << "Properties: " << msg.getProperties() << std::endl;
                             std::cout << std::endl;
                         }
-                        if (opts.printContent)
-                            std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
+                        if (opts.printContent) {
+                            if (!msg.getContentObject().isVoid()) std::cout << msg.getContentObject() << std::endl;
+                            else std::cout << msg.getContent() << std::endl;
+                        }
                         if (opts.messages && count >= opts.messages) done = true;
                     }
                 }
@@ -245,7 +251,7 @@ int main(int argc, char ** argv)
                 } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
                     session.acknowledge();
                 }
-                if (msg.getReplyTo()) { // Echo message back to reply-to address.
+                if (msg.getReplyTo() && !opts.noReplies) { // Echo message back to reply-to address.
                     Sender& s = replyTo[msg.getReplyTo().str()];
                     if (s.isNull()) {
                         s = session.createSender(msg.getReplyTo());
@@ -260,8 +266,6 @@ int main(int argc, char ** argv)
                     int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
                     if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
                 }
-                // Clear out message properties & content for next iteration.
-                msg = Message(); // TODO aconway 2010-12-01: should be done by fetch
             }
             if (opts.reportTotal) reporter.report();
             if (opts.tx) {

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/qpid-send.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/qpid-send.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/qpid-send.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/qpid-send.cpp Fri Sep 20 18:59:30 2013
@@ -96,7 +96,7 @@ struct Options : public qpid::Options
     Options(const std::string& argv0=std::string())
         : qpid::Options("Options"),
           help(false),
-          url("amqp:tcp:127.0.0.1"),
+          url("127.0.0.1"),
           messages(1),
           sendEos(0),
           durable(false),
@@ -262,9 +262,8 @@ class MapContentGenerator   : public Con
   public:
     MapContentGenerator(const Options& opt) : opts(opt) {}
     virtual bool setContent(Message& msg) {
-        Variant::Map map;
-        opts.setEntries(map);
-        encode(map, msg);
+        msg.getContentObject() = qpid::types::Variant::Map();
+        opts.setEntries(msg.getContentObject().asMap());
         return true;
     }
   private:
@@ -371,6 +370,7 @@ int main(int argc, char ** argv)
                 msg.setReplyTo(Address(opts.replyto));
             }
             if (!opts.userid.empty()) msg.setUserId(opts.userid);
+            if (!opts.id.empty()) msg.setMessageId(opts.id);
             if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid);
             opts.setProperties(msg);
             uint sent = 0;

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/qpidd_qmfv2_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/qpidd_qmfv2_tests.py?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/qpidd_qmfv2_tests.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/qpidd_qmfv2_tests.py Fri Sep 20 18:59:30 2013
@@ -39,20 +39,25 @@ class ConsoleTest(BrokerTest):
 
     def setUp(self):
         BrokerTest.setUp(self)
-        args = ["--mgmt-qmf1=no",
-                "--mgmt-pub-interval=%d" % self.PUB_INTERVAL]
+
+    def _startBroker(self, QMFv1=False ):
+        self._broker_is_v1 = QMFv1
+        if self._broker_is_v1:
+            args = ["--mgmt-qmf1=yes", "--mgmt-qmf2=no"]
+        else:
+            args = ["--mgmt-qmf1=no", "--mgmt-qmf2=yes"]
+
+        args.append("--mgmt-pub-interval=%d" % self.PUB_INTERVAL)
         self.broker = BrokerTest.broker(self, args)
 
-    def _startQmfV2(self, broker, console=None):
+
+    def _myStartQmf(self, broker, console=None):
         # I manually set up the QMF session here rather than call the startQmf
         # method from BrokerTest as I can guarantee the console library is used
         # (assuming BrokerTest's implementation of startQmf could change)
         self.qmf_session = qmf.console.Session(console)
         self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (broker.host(),
                                                                 broker.port()))
-        self.assertEqual(self.qmf_broker.getBrokerAgent().isV2, True,
-                         "Expected broker agent to support QMF V2")
-
 
     def _create_queue( self, q_name, args={} ):
         broker = self.qmf_session.getObjects(_class="broker")[0]
@@ -60,11 +65,11 @@ class ConsoleTest(BrokerTest):
         self.assertEqual(result.status, 0, result)
 
 
-    def test_method_call(self):
+    def _test_method_call(self):
         """ Verify method calls work, and check the behavior of getObjects()
         call
         """
-        self._startQmfV2( self.broker )
+        self._myStartQmf( self.broker )
         self._create_queue( "fleabag", {"auto-delete":True} )
 
         qObj = None
@@ -76,12 +81,16 @@ class ConsoleTest(BrokerTest):
         self.assertNotEqual(qObj, None, "Failed to get queue object")
         #print qObj
 
-    def test_unsolicited_updates(self):
+    def _test_unsolicited_updates(self):
         """ Verify that the Console callbacks work
         """
 
         class Handler(qmf.console.Console):
             def __init__(self):
+                self.v1_oids = 0
+                self.v1_events = 0
+                self.v2_oids = 0
+                self.v2_events = 0
                 self.broker_info = []
                 self.broker_conn = []
                 self.newpackage = []
@@ -109,28 +118,38 @@ class ConsoleTest(BrokerTest):
             def event(self, broker, event):
                 #print "EVENT %s" % event
                 self.events.append(event)
-            def objectProps(self, broker, record):
-                #print "ObjProps %s" % record
-                assert len(record.getProperties()), "objectProps() invoked with no properties?"
+                if event.isV2:
+                    self.v2_events += 1
+                else:
+                    self.v1_events += 1
+
+            def heartbeat(self, agent, timestamp):
+                #print "Heartbeat %s" % agent
+                self.heartbeats.append( (agent, timestamp) )
+
+            # generic handler for objectProps and objectStats
+            def _handle_obj_update(self, record):
                 oid = record.getObjectId()
+                if oid.isV2:
+                    self.v2_oids += 1
+                else:
+                    self.v1_oids += 1
+
                 if oid not in self.updates:
                     self.updates[oid] = record
                 else:
                     self.updates[oid].mergeUpdate( record )
+
+            def objectProps(self, broker, record):
+                assert len(record.getProperties()), "objectProps() invoked with no properties?"
+                self._handle_obj_update(record)
+
             def objectStats(self, broker, record):
-                #print "ObjStats %s" % record
                 assert len(record.getStatistics()), "objectStats() invoked with no properties?"
-                oid = record.getObjectId()
-                if oid not in self.updates:
-                    self.updates[oid] = record
-                else:
-                    self.updates[oid].mergeUpdate( record )
-            def heartbeat(self, agent, timestamp):
-                #print "Heartbeat %s" % agent
-                self.heartbeats.append( (agent, timestamp) )
+                self._handle_obj_update(record)
 
         handler = Handler()
-        self._startQmfV2( self.broker, handler )
+        self._myStartQmf( self.broker, handler )
         # this should force objectProps, queueDeclare Event callbacks
         self._create_queue( "fleabag", {"auto-delete":True} )
         # this should force objectStats callback
@@ -163,7 +182,15 @@ class ConsoleTest(BrokerTest):
                 break
         assert msgs == 3, "msgDepth statistics not accurate!"
 
-    def test_async_method(self):
+        # verify that the published objects were of the correct QMF version
+        if self._broker_is_v1:
+            assert handler.v1_oids and handler.v2_oids == 0, "QMFv2 updates received while in V1-only mode!"
+            assert handler.v1_events and handler.v2_events == 0, "QMFv2 events received while in V1-only mode!"
+        else:
+            assert handler.v2_oids and handler.v1_oids == 0, "QMFv1 updates received while in V2-only mode!"
+            assert handler.v2_events and handler.v1_events == 0, "QMFv1 events received while in V2-only mode!"
+
+    def _test_async_method(self):
         class Handler (qmf.console.Console):
             def __init__(self):
                 self.cv = Condition()
@@ -207,12 +234,40 @@ class ConsoleTest(BrokerTest):
                     return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious)
 
         handler = Handler()
-        self._startQmfV2(self.broker, handler)
+        self._myStartQmf(self.broker, handler)
         broker = self.qmf_session.getObjects(_class="broker")[0]
         handler.request(broker, 20)
         sleep(1)
         self.assertEqual(handler.check(), "pass")
 
+    def test_method_call(self):
+        self._startBroker()
+        self._test_method_call()
+
+    def test_unsolicited_updates(self):
+        self._startBroker()
+        self._test_unsolicited_updates()
+
+    def test_async_method(self):
+        self._startBroker()
+        self._test_async_method()
+
+    # For now, include "QMFv1 only" tests.  Once QMFv1 is deprecated, these can
+    # be removed
+
+    def test_method_call_v1(self):
+        self._startBroker(QMFv1=True)
+        self._test_method_call()
+
+    def test_unsolicited_updates_v1(self):
+        self._startBroker(QMFv1=True)
+        self._test_unsolicited_updates()
+
+    def test_async_method_v1(self):
+        self._startBroker(QMFv1=True)
+        self._test_async_method()
+
+
 
 if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/qpidt
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/qpidt?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/qpidt (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/qpidt Fri Sep 20 18:59:30 2013
@@ -117,7 +117,7 @@ class Manager:
                     if k == "name":
                         name = v
                     elif v:
-                        if isinstance(v, dict) and v["_object_name"]:
+                        if isinstance(v, dict) and "_object_name" in v:
                             v = v["_object_name"]
                         details += "%s=%s " %(k,v)
                 print "%-25s %s" % (name, details)

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/queue_flow_limit_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/queue_flow_limit_tests.py?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/queue_flow_limit_tests.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/queue_flow_limit_tests.py Fri Sep 20 18:59:30 2013
@@ -27,6 +27,8 @@ from os import environ, popen
 
 class QueueFlowLimitTests(TestBase010):
 
+    _timeout = 100
+
     def __getattr__(self, name):
         if name == "assertGreater":
             return lambda a, b: self.failUnless(a > b)
@@ -156,7 +158,7 @@ class QueueFlowLimitTests(TestBase010):
         totalMsgs = 1213 + 797 + 331
 
         # wait until flow control is active
-        deadline = time() + 10
+        deadline = time() + self._timeout
         while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
                 time() < deadline:
             pass
@@ -209,7 +211,7 @@ class QueueFlowLimitTests(TestBase010):
         totalMsgs = 1699 + 1129 + 881
 
         # wait until flow control is active
-        deadline = time() + 10
+        deadline = time() + self._timeout
         while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
                 time() < deadline:
             pass
@@ -255,7 +257,7 @@ class QueueFlowLimitTests(TestBase010):
 
         # fill up the queue, waiting until flow control is active
         sndr1 = self._start_qpid_send(testq.mgmt.name, count=testq.sendCount, content=testq.content)
-        deadline = time() + 10
+        deadline = time() + self._timeout
         while (not testq.mgmt.flowStopped) and time() < deadline:
             testq.mgmt.update()
 
@@ -357,7 +359,7 @@ class QueueFlowLimitTests(TestBase010):
 
         sender = BlockedSender(self, "kill-q", count=100)
         # wait for flow control
-        deadline = time() + 10
+        deadline = time() + self._timeout
         while (not q.flowStopped) and time() < deadline:
             q.update()
 

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/rsynchosts
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/rsynchosts?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/rsynchosts (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/rsynchosts Fri Sep 20 18:59:30 2013
@@ -27,13 +27,21 @@ abspath() {
 }
 
 usage() {
-    echo "Usage: $(basename $0) file [file...]
+    echo "Usage: $(basename $0) [-l user] file [file...]
 Synchronize the contents of each file or directory to the same absolute path on
 each host in \$HOSTS.
 "
     exit 1
 }
 
+while getopts "l:" opt; do
+    case $opt in
+	l) RSYNC_USER="$OPTARG@" ;;
+	*) usage ;;
+    esac
+done
+shift `expr $OPTIND - 1`
+
 test "$*" || usage
 
 for f in $*; do FILES="$FILES $(abspath $f)" || exit 1; done
@@ -42,7 +50,7 @@ OK_FILE=`mktemp`		# Will be deleted if a
 trap "rm -f $OK_FILE" EXIT
 
 for h in $HOSTS; do
-    rsync -aRO --delete $FILES $h:/ || { echo "rsync to $h failed"; rm -f $OK_FILE; } &
+    rsync -vaRO --delete $FILES $RSYNC_USER$h:/ || { echo "rsync to $h failed"; rm -f $OK_FILE; } &
 done
 wait
 test -f $OK_FILE

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/run_cli_tests
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/run_cli_tests?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/run_cli_tests (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/run_cli_tests Fri Sep 20 18:59:30 2013
@@ -44,8 +44,8 @@ start_brokers() {
     # look like they're xml related.
     # if we start supporting xml on windows, it will need something similar 
     # here
-    if [ -f ../.libs/xml.so ] ; then
-	xargs="--load-module ../.libs/xml.so"
+    if [ -f ../xml.so ] ; then
+	xargs="--load-module ../xml.so"
 	if [ ! -f test.xquery ] ; then 
 	    create_test_xquery
 	fi

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/run_federation_tests
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/run_federation_tests?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/run_federation_tests (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/run_federation_tests Fri Sep 20 18:59:30 2013
@@ -25,7 +25,7 @@ source ./test_env.sh
 #set -x
 trap stop_brokers INT TERM QUIT
 
-if [ -f ../.libs/xml.so ] ; then
+if [ -f ../xml.so ] ; then
     MODULES="--load-module xml" # Load the XML exchange and run XML exchange federation tests
     SKIPTESTS=
 else

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/test_env.ps1.in
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/test_env.ps1.in?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/test_env.ps1.in (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/test_env.ps1.in Fri Sep 20 18:59:30 2013
@@ -41,7 +41,6 @@ $PYTHON_COMMANDS="$QPID_TOOLS\src\py"
 $env:PYTHONPATH="$srcdir;$PYTHON_DIR;$PYTHON_COMMANDS;$QPID_TESTS_PY;$QPID_TOOLS_LIBS;$QMF_LIB;$env:PYTHONPATH"
 $QPID_CONFIG_EXEC="$PYTHON_COMMANDS\qpid-config"
 $QPID_ROUTE_EXEC="$PYTHON_COMMANDS\qpid-route"
-$QPID_CLUSTER_EXEC="$PYTHON_COMMANDS\qpid-cluster"
 $QPID_HA_TOOL_EXEC="$PYTHON_COMMANDS\qpid-ha-tool"
 
 # Executables

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/test_env.sh.in
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/test_env.sh.in?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/test_env.sh.in (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/test_env.sh.in Fri Sep 20 18:59:30 2013
@@ -25,6 +25,8 @@ builddir=`absdir @abs_builddir@`
 top_srcdir=`absdir @abs_top_srcdir@`
 top_builddir=`absdir @abs_top_builddir@`
 moduledir=$top_builddir/src@builddir_lib_suffix@
+pythonswigdir=$top_builddir/bindings/qpid/python/
+pythonswiglibdir=$top_builddir/bindings/qpid/python@builddir_lib_suffix@
 testmoduledir=$builddir@builddir_lib_suffix@
 export QPID_INSTALL_PREFIX=@prefix@
 
@@ -44,7 +46,8 @@ export PYTHONPATH=$srcdir:$PYTHON_DIR:$P
 export QPID_CONFIG_EXEC=$PYTHON_COMMANDS/qpid-config
 export QPID_ROUTE_EXEC=$PYTHON_COMMANDS/qpid-route
 export QPID_HA_EXEC=$PYTHON_COMMANDS/qpid-ha
-
+export PYTHONPATH_SWIG=$pythonswigdir:$pythonswiglibdir
+export PYTHONSWIGMODULE=$pythonswigdir/qpid_messaging.py
 # Executables
 export QPIDD_EXEC=$top_builddir/src/qpidd
 
@@ -78,5 +81,5 @@ if [ ! -e "$HOME" ]; then
 fi
 
 # Options for boost test framework
-export BOOST_TEST_SHOW_PROGRESS=yes
-export BOOST_TEST_CATCH_SYSTEM_ERRORS=no
+test -z "$BOOST_TEST_SHOW_PROGRESS" && export BOOST_TEST_SHOW_PROGRESS=yes
+test -z "$BOOST_TEST_CATCH_SYSTEM_ERRORS" && export BOOST_TEST_CATCH_SYSTEM_ERRORS=no

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/test_store.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/test_store.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/test_store.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/test_store.cpp Fri Sep 20 18:59:30 2013
@@ -40,14 +40,19 @@
 #include "qpid/sys/Thread.h"
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
+#include "qpid/RefCounted.h"
+#include "qpid/Msg.h"
 #include <boost/cast.hpp>
 #include <boost/lexical_cast.hpp>
 #include <memory>
+#include <ostream>
 #include <fstream>
+#include <sstream>
 
-using namespace qpid;
-using namespace broker;
 using namespace std;
+using namespace boost;
+using namespace qpid;
+using namespace qpid::broker;
 using namespace qpid::sys;
 
 namespace qpid {
@@ -57,11 +62,19 @@ struct TestStoreOptions : public Options
 
     string name;
     string dump;
+    string events;
+    vector<string> throwMsg; // Throw exception if message content matches.
 
     TestStoreOptions() : Options("Test Store Options") {
         addOptions()
-            ("test-store-name", optValue(name, "NAME"), "Name of test store instance.")
-            ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.")
+            ("test-store-name", optValue(name, "NAME"),
+             "Name of test store instance.")
+            ("test-store-dump", optValue(dump, "FILE"),
+             "File to dump enqueued messages.")
+            ("test-store-events", optValue(events, "FILE"),
+             "File to log events, 1 line per event.")
+            ("test-store-throw", optValue(throwMsg, "CONTENT"),
+             "Throw exception if message content matches.")
             ;
     }
 };
@@ -82,24 +95,76 @@ class TestStore : public NullMessageStor
     TestStore(const TestStoreOptions& opts, Broker& broker_)
         : options(opts), name(opts.name), broker(broker_)
     {
-        QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump);
-        if (!options.dump.empty()) 
+        QPID_LOG(info, "TestStore name=" << name
+                 << " dump=" << options.dump
+                 << " events=" << options.events
+                 << " throw messages =" << options.throwMsg.size());
+
+        if (!options.dump.empty())
             dump.reset(new ofstream(options.dump.c_str()));
+        if (!options.events.empty())
+            events.reset(new ofstream(options.events.c_str()));
     }
 
     ~TestStore() {
         for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1));
     }
 
-    virtual bool isNull() const { return false; }
-    
-    void enqueue(TransactionContext* ,
+    // Dummy transaction context.
+    struct TxContext : public TPCTransactionContext {
+        static int nextId;
+        string id;
+        TxContext() : id(lexical_cast<string>(nextId++)) {}
+        TxContext(string xid) : id(xid) {}
+    };
+
+    static string getId(const TransactionContext& tx) {
+        const TxContext* tc = dynamic_cast<const TxContext*>(&tx);
+        assert(tc);
+        return tc->id;
+    }
+
+
+    bool isNull() const { return false; }
+
+    void log(const string& msg) {
+        QPID_LOG(info, "test_store: " << msg);
+        if (events.get()) *events << msg << endl << std::flush;
+    }
+
+    auto_ptr<TransactionContext> begin() {
+        auto_ptr<TxContext> tx(new TxContext());
+        log(Msg() << "<begin tx " << tx->id << ">");
+        return auto_ptr<TransactionContext>(tx);
+    }
+
+    auto_ptr<TPCTransactionContext> begin(const std::string& xid)  {
+        auto_ptr<TxContext> tx(new TxContext(xid));
+        log(Msg() << "<begin tx " << tx->id << ">");
+        return auto_ptr<TPCTransactionContext>(tx);
+    }
+
+    string getContent(const intrusive_ptr<PersistableMessage>& msg) {
+        intrusive_ptr<broker::Message::Encoding> enc(
+            dynamic_pointer_cast<broker::Message::Encoding>(msg));
+        return enc->getContent();
+    }
+
+    void enqueue(TransactionContext* tx,
                  const boost::intrusive_ptr<PersistableMessage>& pmsg,
-                 const PersistableQueue& )
+                 const PersistableQueue& queue)
     {
-        qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
+        QPID_LOG(debug, "TestStore enqueue " << queue.getName());
+        qpid::broker::amqp_0_10::MessageTransfer* msg =
+            dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
         assert(msg);
 
+        ostringstream o;
+        o << "<enqueue " << queue.getName() << " " << getContent(msg);
+        if (tx) o << " tx=" << getId(*tx);
+        o << ">";
+        log(o.str());
+
         // Dump the message if there is a dump file.
         if (dump.get()) {
             msg->getFrames().getMethod()->print(*dump);
@@ -113,7 +178,11 @@ class TestStore : public NullMessageStor
         string data = msg->getFrames().getContent();
         size_t i = string::npos;
         size_t j = string::npos;
-        if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0
+        const vector<string>& throwMsg(options.throwMsg);
+        if (find(throwMsg.begin(), throwMsg.end(), data) != throwMsg.end()) {
+                throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data));
+        }
+        else if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0
             && (i = data.find(name+"[")) != string::npos
             && (j = data.find("]", i)) != string::npos)
         {
@@ -144,6 +213,31 @@ class TestStore : public NullMessageStor
             msg->enqueueComplete();
     }
 
+    void dequeue(TransactionContext* tx,
+                 const boost::intrusive_ptr<PersistableMessage>& msg,
+                 const PersistableQueue& queue)
+    {
+        QPID_LOG(debug, "TestStore dequeue " << queue.getName());
+        ostringstream o;
+        o<< "<dequeue " << queue.getName() << " " << getContent(msg);
+        if (tx) o << " tx=" << getId(*tx);
+        o << ">";
+        log(o.str());
+    }
+
+    void prepare(TPCTransactionContext& txn) {
+        log(Msg() << "<prepare tx=" << getId(txn) << ">");
+    }
+
+    void commit(TransactionContext& txn) {
+        log(Msg() << "<commit tx=" << getId(txn) << ">");
+    }
+
+    void abort(TransactionContext& txn) {
+        log(Msg() << "<abort tx=" << getId(txn) << ">");
+    }
+
+
   private:
     static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
     TestStoreOptions options;
@@ -151,8 +245,11 @@ class TestStore : public NullMessageStor
     Broker& broker;
     vector<Thread> threads;
     std::auto_ptr<ofstream> dump;
+    std::auto_ptr<ofstream> events;
 };
 
+int TestStore::TxContext::nextId(1);
+
 const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
 const string TestStore::EXCEPTION = "exception";
 const string TestStore::EXIT_PROCESS = "exit_process";

Modified: qpid/branches/linearstore/qpid/cpp/src/tests/test_tools.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/test_tools.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/test_tools.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/test_tools.h Fri Sep 20 18:59:30 2013
@@ -23,7 +23,6 @@
 #include <limits.h>             // Include before boost/test headers.
 #include <boost/test/test_tools.hpp>
 #include <boost/assign/list_of.hpp>
-#include <boost/assign/list_of.hpp>
 #include <vector>
 #include <set>
 #include <ostream>

Modified: qpid/branches/linearstore/qpid/doc/book/Makefile
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/doc/book/Makefile?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/doc/book/Makefile (original)
+++ qpid/branches/linearstore/qpid/doc/book/Makefile Fri Sep 20 18:59:30 2013
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-DIRS = src/java-broker src/cpp-broker src/programming
+DIRS = src/java-broker src/java-perftests src/cpp-broker src/programming
 
 
 .PHONY: all $(DIRS)

Modified: qpid/branches/linearstore/qpid/doc/book/src/Makefile.inc
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/doc/book/src/Makefile.inc?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/doc/book/src/Makefile.inc (original)
+++ qpid/branches/linearstore/qpid/doc/book/src/Makefile.inc Fri Sep 20 18:59:30 2013
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-BOOK=$(wildcard *Book.xml Programming-In-Apache-Qpid.xml)
+BOOK=$(wildcard *Book.xml Programming-In-Apache-Qpid.xml JMS-Performance-Test-Framework.xml)
 XML=$(wildcard *.xml) $(wildcard ../common/*.xml)
 IMAGES=$(wildcard images/*.png) 
 CSS=$(wilcard ../common/css/*.css)

Modified: qpid/branches/linearstore/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml (original)
+++ qpid/branches/linearstore/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml Fri Sep 20 18:59:30 2013
@@ -113,12 +113,12 @@ under the License.
 	been replicated to a backup, then it doesn't need to be replicated.
       </para>
       <variablelist>
-	<title>Status of a HA broker</title>
+	<title>HA Broker States</title>
 	<varlistentry>
 	  <term>Joining</term>
 	  <listitem>
 	    <para>
-	      Initial status of a new broker that has not yet connected to the primary.
+	      Initial state of a new broker that has not yet connected to the primary.
 	    </para>
 	  </listitem>
 	</varlistentry>
@@ -299,7 +299,7 @@ ssl_addr = "ssl:" host [":" port]'
 		Specifies whether queues and exchanges are replicated by default.
 		<replaceable>VALUE</replaceable> is one of: <literal>none</literal>,
 		<literal>configuration</literal>, <literal>all</literal>.
-		For details see <xref linkend="ha-creating-replicated"/>.
+		For details see <xref linkend="ha-replicate-values"/>.
 	      </para>
 	    </entry>
 	  </row>
@@ -499,9 +499,8 @@ NOTE: fencing is not shown, you must con
       <command>qpidd</command> broker to primary status.
     </para>
     <para>
-      The <literal>resources</literal> section also defines a pair of virtual IP
-      addresses on different sub-nets. One will be used for broker-to-broker
-      communication, the other for client-to-broker.
+      The <literal>resources</literal> section also defines a virtual IP
+      address for clients.
     </para>
     <para>
       To take advantage of the virtual IP addresses, <filename>qpidd.conf</filename>

Modified: qpid/branches/linearstore/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-REST-API.xml
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-REST-API.xml?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-REST-API.xml (original)
+++ qpid/branches/linearstore/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-REST-API.xml Fri Sep 20 18:59:30 2013
@@ -119,7 +119,7 @@
             <row>
               <entry>
                     <para>/rest/queue</para>
-                    <para>/rest/queue/&lt;virtual host name&gt;/&gt;queue name&gt;</para>
+                    <para>/rest/queue/&lt;virtual host name&gt;/&lt;queue name&gt;</para>
               </entry>
               <entry>Rest service to manage queue(s)</entry>
               <entry>Retrieves the details about the queue(s)</entry>
@@ -173,7 +173,7 @@
             </row>
             <row>
               <entry>
-                    <para>/rest/message/*</para>
+                    <para>/rest/message/&lt;virtual host name&gt;/&lt;queue name&gt;</para>
               </entry>
               <entry>Rest service to manage messages(s)</entry>
               <entry>Retrieves the details about the messages(s)</entry>
@@ -183,7 +183,7 @@
             </row>
             <row>
               <entry>
-                    <para>/rest/message-content/*</para>
+                    <para>/rest/message-content/&lt;virtual host name&gt;/&lt;queue name&gt;</para>
               </entry>
               <entry>Rest service to retrieve message content</entry>
               <entry>Retrieves the message content</entry>
@@ -275,22 +275,22 @@
         all bindings for all queues for the given exchange in the virtual host.
         </para>
         <example>
-        <title>Examples of queue creation using curl:</title>
+        <title>Examples of queue creation using curl (authenticating as user admin):</title>
             <programlisting><![CDATA[
 #create a durable queue
-curl -X PUT  -d '{"durable":true}' http://localhost:8080/rest/queue/<vhostname>/<queuename>
+curl --user admin -X PUT  -d '{"durable":true}' http://localhost:8080/rest/queue/<vhostname>/<queuename>
 #create a durable priority queue
-curl -X PUT  -d '{"durable":true,"type":"priority"}' http://localhost:8080/rest/queue/<vhostname>/<queuename>
+curl --user admin -X PUT  -d '{"durable":true,"type":"priority"}' http://localhost:8080/rest/queue/<vhostname>/<queuename>
             ]]></programlisting>
         </example><example>
         <title>Example of binding a queue to an exchange using curl</title>
             <programlisting><![CDATA[
-curl  -X PUT  -d '{}' http://localhost:8080/rest/binding/<vhostname>/<exchangename>/<queue-name>/<binding-name>
+curl --user admin -X PUT  -d '{}' http://localhost:8080/rest/binding/<vhostname>/<exchangename>/<queue-name>/<binding-name>
             ]]></programlisting>
         </example>
         <para>
-            NOTE: the above examples were performed after editing the
-            <link linkend="Java-Broker-Configuring-And-Managing-HTTP-Management-Plugin-Configuration">HTTP Management Plugin Configuration</link>
-            to enable HTTP Basic Authentication on connections not using SSL (i.e HTTPS).
+            NOTE: These curl examples utilise unsecure HTTP transport. To use the examples it is first necessary enable Basic
+            authentication for HTTP within the HTTP Management Configuration (it is off by default).
+            For details see <xref linkend="Java-Broker-Configuring-And-Managing-HTTP-Management-Plugin-Configuration"/>
         </para>
 </section>

Modified: qpid/branches/linearstore/qpid/doc/book/src/java-broker/commonEntities.xml
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/doc/book/src/java-broker/commonEntities.xml?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/doc/book/src/java-broker/commonEntities.xml (original)
+++ qpid/branches/linearstore/qpid/doc/book/src/java-broker/commonEntities.xml Fri Sep 20 18:59:30 2013
@@ -23,7 +23,7 @@
 <!ENTITY qpidProgrammingBook "../../Programming-In-Apache-Qpid/html/">
 <!ENTITY qpidCppBook "../../AMQP-Messaging-Broker-CPP-Book/html/">
 
-<!ENTITY qpidCurrentRelease "0.23">
+<!ENTITY qpidCurrentRelease "0.25">
 
 <!ENTITY windowsBrokerDownloadFileName "qpid-java-broker-&qpidCurrentRelease;.zip">
 <!ENTITY windowsExtractedBrokerDirName "qpid-broker-&qpidCurrentRelease;">
@@ -40,5 +40,5 @@
 <!ENTITY oracleBdbProductOverviewUrl "http://www.oracle.com/technetwork/products/berkeleydb/overview/index-093405.html">
 <!ENTITY oracleBdbRepGuideUrl "http://oracle.com/cd/E17277_02/html/ReplicationGuide/">
 <!ENTITY oracleBdbJavaDocUrl "http://docs.oracle.com/cd/E17277_02/html/java/">
-<!ENTITY oracleBdbProductVersion "5.0.73">
+<!ENTITY oracleBdbProductVersion "5.0.84">
 

Modified: qpid/branches/linearstore/qpid/extras/dispatch/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/CMakeLists.txt?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/CMakeLists.txt (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/CMakeLists.txt Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
 ## to you under the Apache License, Version 2.0 (the
 ## "License"); you may not use this file except in compliance
 ## with the License.  You may obtain a copy of the License at
-## 
+##
 ##   http://www.apache.org/licenses/LICENSE-2.0
-## 
+##
 ## Unless required by applicable law or agreed to in writing,
 ## software distributed under the License is distributed on an
 ## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@ include(CheckLibraryExists)
 include(CheckSymbolExists)
 include(CheckFunctionExists)
 include(CheckIncludeFiles)
+include(FindPythonInterp)
 include(FindPythonLibs)
 
 enable_testing()
@@ -52,6 +53,14 @@ set(SYSCONF_INSTALL_DIR etc CACHE PATH "
 set(SHARE_INSTALL_DIR share CACHE PATH "Shared read only data directory")
 set(MAN_INSTALL_DIR share/man CACHE PATH "Manpage directory")
 
+# determine the location for installing the python packages
+if (PYTHONLIBS_FOUND)
+    execute_process(COMMAND ${PYTHON_EXECUTABLE}
+                    -c "from distutils.sysconfig import get_python_lib; print get_python_lib(False)"
+                    OUTPUT_VARIABLE PYTHON_SITELIB_PACKAGES
+                    OUTPUT_STRIP_TRAILING_WHITESPACE)
+endif (PYTHONLIBS_FOUND)
+
 include_directories(
     ${CMAKE_CURRENT_SOURCE_DIR}/include
     ${CMAKE_CURRENT_SOURCE_DIR}/src
@@ -107,7 +116,53 @@ install(TARGETS qpid-dispatch
 file(GLOB headers "include/qpid/dispatch/*.h")
 install(FILES ${headers} DESTINATION ${INCLUDE_INSTALL_DIR}/qpid/dispatch)
 install(FILES include/qpid/dispatch.h DESTINATION ${INCLUDE_INSTALL_DIR}/qpid)
+install(FILES etc/qpid-dispatch.conf DESTINATION ${SYSCONF_INSTALL_DIR})
+
+##
+## Python modules installation
+##
+set(PYTHON_STUBS_SOURCES
+    python/qpid/dispatch/stubs/__init__.py
+    python/qpid/dispatch/stubs/ioadapter.py
+    python/qpid/dispatch/stubs/logadapter.py
+)
+
+set(PYTHON_ROUTER_SOURCES
+    python/qpid/dispatch/router/link.py
+    python/qpid/dispatch/router/router_engine.py
+    python/qpid/dispatch/router/__init__.py
+    python/qpid/dispatch/router/adapter.py
+    python/qpid/dispatch/router/mobile.py
+    python/qpid/dispatch/router/node.py
+    python/qpid/dispatch/router/routing.py
+    python/qpid/dispatch/router/data.py
+    python/qpid/dispatch/router/configuration.py
+    python/qpid/dispatch/router/neighbor.py
+    python/qpid/dispatch/router/path.py
+    python/qpid/dispatch/router/binding.py
+)
+
+set(PYTHON_CONFIG_SOURCES
+    python/qpid/dispatch/config/parser.py
+    python/qpid/dispatch/config/__init__.py
+    python/qpid/dispatch/config/schema.py
+    python/qpid/dispatch/__init__.py
+)
+
+install(FILES ${PYTHON_STUBS_SOURCES}
+        DESTINATION ${PYTHON_SITELIB_PACKAGES}/qpid/dispatch/stubs)
+
+install(FILES ${PYTHON_ROUTER_SOURCES}
+        DESTINATION ${PYTHON_SITELIB_PACKAGES}/qpid/dispatch/router)
+
+install(FILES ${PYTHON_CONFIG_SOURCES}
+        DESTINATION ${PYTHON_SITELIB_PACKAGES}/qpid/dispatch/config)
+
+install(FILES python/qpid/__init__.py
+        DESTINATION ${PYTHON_SITELIB_PACKAGES}/qpid)
 
+install(FILES python/qpid/dispatch/__init__.py
+        DESTINATION ${PYTHON_SITELIB_PACKAGES}/qpid/dispatch)
 ##
 ## Build Tests
 ##

Modified: qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/container.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/container.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/container.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/container.h Fri Sep 20 18:59:30 2013
@@ -51,10 +51,11 @@ typedef enum {
 } dx_direction_t;
 
 
-typedef struct dx_node_t dx_node_t;
-typedef struct dx_link_t dx_link_t;
+typedef struct dx_node_t     dx_node_t;
+typedef struct dx_link_t     dx_link_t;
+typedef struct dx_delivery_t dx_delivery_t;
 
-typedef void (*dx_container_delivery_handler_t)    (void *node_context, dx_link_t *link, pn_delivery_t *delivery);
+typedef void (*dx_container_delivery_handler_t)    (void *node_context, dx_link_t *link, dx_delivery_t *delivery);
 typedef int  (*dx_container_link_handler_t)        (void *node_context, dx_link_t *link);
 typedef int  (*dx_container_link_detach_handler_t) (void *node_context, dx_link_t *link, int closed);
 typedef void (*dx_container_node_handler_t)        (void *type_context, dx_node_t *node);
@@ -65,33 +66,78 @@ typedef struct {
     void *type_context;
     int   allow_dynamic_creation;
 
-    //
+    //=======================
     // Node-Instance Handlers
+    //=======================
+
+    //
+    // rx_handler - Invoked when a new received delivery is avaliable for processing.
+    //
+    dx_container_delivery_handler_t rx_handler;
+
+    //
+    // disp_handler - Invoked when an existing delivery changes disposition
+    //                or settlement state.
+    //
+    dx_container_delivery_handler_t disp_handler;
+
+    //
+    // incoming_handler - Invoked when an attach for a new incoming link is received.
+    //
+    dx_container_link_handler_t incoming_handler;
+
+    //
+    // outgoing_handler - Invoked when an attach for a new outgoing link is received.
+    //
+    dx_container_link_handler_t outgoing_handler;
+
+    //
+    // writable_handler - Invoked when an outgoing link is available for sending either
+    //                    deliveries or disposition changes.  The handler must check the
+    //                    link's credit to determine whether (and how many) message
+    //                    deliveries may be sent.
     //
-    dx_container_delivery_handler_t     rx_handler;
-    dx_container_delivery_handler_t     tx_handler;
-    dx_container_delivery_handler_t     disp_handler;
-    dx_container_link_handler_t         incoming_handler;
-    dx_container_link_handler_t         outgoing_handler;
-    dx_container_link_handler_t         writable_handler;
-    dx_container_link_detach_handler_t  link_detach_handler;
+    dx_container_link_handler_t writable_handler;
 
     //
+    // link_detach_handler - Invoked when a link is detached.
+    //
+    dx_container_link_detach_handler_t link_detach_handler;
+
+    //===================
     // Node-Type Handlers
+    //===================
+
+    //
+    // node_created_handler - Invoked when a new instance of the node-type is created.
     //
     dx_container_node_handler_t  node_created_handler;
+
+    //
+    // node_destroyed_handler - Invoked when an instance of the node type is destroyed.
+    //
     dx_container_node_handler_t  node_destroyed_handler;
+
+    //
+    // inbound_conn_open_handler - Invoked when an incoming connection (via listener)
+    //                             is established.
+    //
     dx_container_conn_handler_t  inbound_conn_open_handler;
+
+    //
+    // outbound_conn_open_handler - Invoked when an outgoing connection (via connector)
+    //                              is established.
+    //
     dx_container_conn_handler_t  outbound_conn_open_handler;
 } dx_node_type_t;
 
 
 int dx_container_register_node_type(dx_dispatch_t *dispatch, const dx_node_type_t *nt);
 
-void dx_container_set_default_node_type(dx_dispatch_t        *dispatch,
-                                        const dx_node_type_t *nt,
-                                        void                 *node_context,
-                                        dx_dist_mode_t        supported_dist);
+dx_node_t *dx_container_set_default_node_type(dx_dispatch_t        *dispatch,
+                                              const dx_node_type_t *nt,
+                                              void                 *node_context,
+                                              dx_dist_mode_t        supported_dist);
 
 dx_node_t *dx_container_create_node(dx_dispatch_t        *dispatch,
                                     const dx_node_type_t *nt,
@@ -116,6 +162,26 @@ pn_terminus_t *dx_link_remote_target(dx_
 void dx_link_activate(dx_link_t *link);
 void dx_link_close(dx_link_t *link);
 
+/**
+ * Important: dx_delivery must never be called twice in a row without an intervening pn_link_advance.
+ *            The Disatch architecture provides a hook for discovering when an outgoing link is writable
+ *            and has credit.  When a link is writable, a delivery is allocated, written, and advanced
+ *            in one operation.  If a backlog of pending deliveries is created, an assertion will be
+ *            thrown.
+ */
+dx_delivery_t *dx_delivery(dx_link_t *link, pn_delivery_tag_t tag);
+void dx_delivery_free(dx_delivery_t *delivery, uint64_t final_disposition);
+void dx_delivery_set_peer(dx_delivery_t *delivery, dx_delivery_t *peer);
+dx_delivery_t *dx_delivery_peer(dx_delivery_t *delivery);
+void dx_delivery_set_context(dx_delivery_t *delivery, void *context);
+void *dx_delivery_context(dx_delivery_t *delivery);
+pn_delivery_t *dx_delivery_pn(dx_delivery_t *delivery);
+void dx_delivery_settle(dx_delivery_t *delivery);
+bool dx_delivery_settled(dx_delivery_t *delivery);
+bool dx_delivery_disp_changed(dx_delivery_t *delivery);
+uint64_t dx_delivery_disp(dx_delivery_t *delivery);
+dx_link_t *dx_delivery_link(dx_delivery_t *delivery);
+
 
 typedef struct dx_link_item_t dx_link_item_t;
 

Modified: qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/message.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/message.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/message.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/message.h Fri Sep 20 18:59:30 2013
@@ -19,12 +19,13 @@
  * under the License.
  */
 
-#include <proton/engine.h>
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/alloc.h>
 #include <qpid/dispatch/iterator.h>
 #include <qpid/dispatch/buffer.h>
 #include <qpid/dispatch/compose.h>
+#include <qpid/dispatch/parse.h>
+#include <qpid/dispatch/container.h>
 
 // Callback for status change (confirmed persistent, loaded-in-memory, etc.)
 
@@ -88,34 +89,94 @@ typedef enum {
     DX_FIELD_REPLY_TO_GROUP_ID
 } dx_message_field_t;
 
-//
-// Functions for allocation
-//
+
+/**
+ * Allocate a new message.
+ *
+ * @return A pointer to a dx_message_t that is the sole reference to a newly allocated
+ *         message.
+ */
 dx_message_t *dx_allocate_message(void);
-void dx_free_message(dx_message_t *qm);
-dx_message_t *dx_message_copy(dx_message_t *qm);
-int dx_message_persistent(dx_message_t *qm);
-int dx_message_in_memory(dx_message_t *qm);
-
-void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery);
-pn_delivery_t *dx_message_out_delivery(dx_message_t *msg);
-void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery);
-pn_delivery_t *dx_message_in_delivery(dx_message_t *msg);
 
-//
-// Functions for received messages
-//
-dx_message_t *dx_message_receive(pn_delivery_t *delivery);
-void dx_message_send(dx_message_t *msg, pn_link_t *link);
+/**
+ * Free a message reference.  If this is the last reference to the message, free the
+ * message as well.
+ *
+ * @param msg A pointer to a dx_message_t that is no longer needed.
+ */
+void dx_free_message(dx_message_t *msg);
+
+/**
+ * Make a new reference to an existing message.
+ *
+ * @param msg A pointer to a dx_message_t referencing a message.
+ * @return A new pointer to the same referenced message.
+ */
+dx_message_t *dx_message_copy(dx_message_t *msg);
+
+/**
+ * Retrieve the delivery annotations from a message.
+ *
+ * IMPORTANT: The pointer returned by this function remains owned by the message.
+ *            The caller MUST NOT free the parsed field.
+ *
+ * @param msg Pointer to a received message.
+ * @return Pointer to the parsed field for the delivery annotations.  If the message doesn't
+ *         have delivery annotations, the return value shall be NULL.
+ */
+dx_parsed_field_t *dx_message_delivery_annotations(dx_message_t *msg);
 
+/**
+ * Set the delivery annotations for the message.  If the message already has delivery annotations,
+ * they will be overwritten/replaced by the new field.
+ *
+ * @param msg Pointer to a receiver message.
+ * @param da Pointer to a composed field representing the new delivery annotations of the message.
+ *           If null, the message will not have a delivery annotations field.
+ *           IMPORTANT: The message will not take ownership of the composed field.  The
+ *                      caller is responsible for freeing it after this call.  Since the contents
+ *                      are copied into the message, it is safe to free the composed field
+ *                      any time after the call to this function.
+ */
+void dx_message_set_delivery_annotations(dx_message_t *msg, dx_composed_field_t *da);
+
+/**
+ * Receive message data via a delivery.  This function may be called more than once on the same
+ * delivery if the message spans multiple frames.  Once a complete message has been received, this
+ * function shall return the message.
+ *
+ * @param delivery An incoming delivery from a link
+ * @return A pointer to the complete message or 0 if the message is not yet complete.
+ */
+dx_message_t *dx_message_receive(dx_delivery_t *delivery);
+
+/**
+ * Send the message outbound on an outgoing link.
+ *
+ * @param msg A pointer to a message to be sent.
+ * @param link The outgoing link on which to send the message.
+ */
+void dx_message_send(dx_message_t *msg, dx_link_t *link);
+
+/**
+ * Check that the message is well-formed up to a certain depth.  Any part of the message that is
+ * beyond the specified depth is not checked for validity.
+ */
 int dx_message_check(dx_message_t *msg, dx_message_depth_t depth);
+
+/**
+ * Return an iterator for the requested message field.  If the field is not in the message,
+ * return NULL.
+ *
+ * @param msg A pointer to a message.
+ * @param field The field to be returned via iterator.
+ * @return A field iterator that spans the requested field.
+ */
 dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field);
 
 ssize_t dx_message_field_length(dx_message_t *msg, dx_message_field_t field);
 ssize_t dx_message_field_copy(dx_message_t *msg, dx_message_field_t field, void *buffer);
 
-pn_delivery_t *dx_message_inbound_delivery(dx_message_t *qm);
-
 //
 // Functions for composed messages
 //

Modified: qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/router.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/router.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/router.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/include/qpid/dispatch/router.h Fri Sep 20 18:59:30 2013
@@ -29,9 +29,9 @@ typedef struct dx_address_t dx_address_t
 
 typedef void (*dx_router_message_cb)(void *context, dx_message_t *msg);
 
+const char *dx_router_id(const dx_dispatch_t *dx);
 
 dx_address_t *dx_router_register_address(dx_dispatch_t        *dx,
-                                         bool                  is_local,
                                          const char           *address,
                                          dx_router_message_cb  handler,
                                          void                 *context);

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -262,7 +262,7 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx)
     DEQ_INIT(agent->out_fifo);
     agent->lock    = sys_mutex();
     agent->timer   = dx_timer(dx, dx_agent_deferred_handler, agent);
-    agent->address = dx_router_register_address(dx, true, "agent", dx_agent_rx_handler, agent);
+    agent->address = dx_router_register_address(dx, "agent", dx_agent_rx_handler, agent);
 
     return agent;
 }

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/alloc.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/alloc.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/alloc.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/alloc.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,24 +24,34 @@
 #include <memory.h>
 #include <stdio.h>
 
-typedef struct item_t item_t;
+typedef struct dx_alloc_type_t dx_alloc_type_t;
+typedef struct dx_alloc_item_t dx_alloc_item_t;
 
-struct item_t {
-    DEQ_LINKS(item_t);
+struct dx_alloc_type_t {
+    DEQ_LINKS(dx_alloc_type_t);
     dx_alloc_type_desc_t *desc;
 };
 
-DEQ_DECLARE(item_t, item_list_t);
+DEQ_DECLARE(dx_alloc_type_t, dx_alloc_type_list_t);
+
+
+struct dx_alloc_item_t {
+    DEQ_LINKS(dx_alloc_item_t);
+};
+
+DEQ_DECLARE(dx_alloc_item_t, dx_alloc_item_list_t);
+
 
 struct dx_alloc_pool_t {
-    item_list_t free_list;
+    dx_alloc_item_list_t free_list;
 };
 
 dx_alloc_config_t dx_alloc_default_config_big   = {16,  32, 0};
 dx_alloc_config_t dx_alloc_default_config_small = {64, 128, 0};
+#define BIG_THRESHOLD 256
 
-static sys_mutex_t *init_lock;
-static item_list_t  type_list;
+static sys_mutex_t          *init_lock;
+static dx_alloc_type_list_t  type_list;
 
 static void dx_alloc_init(dx_alloc_type_desc_t *desc)
 {
@@ -56,7 +66,7 @@ static void dx_alloc_init(dx_alloc_type_
 
     if (!desc->global_pool) {
         if (desc->config == 0)
-            desc->config = desc->total_size > 256 ?
+            desc->config = desc->total_size > BIG_THRESHOLD ?
                 &dx_alloc_default_config_big : &dx_alloc_default_config_small;
 
         assert (desc->config->local_free_list_max >= desc->config->transfer_batch_size);
@@ -66,12 +76,12 @@ static void dx_alloc_init(dx_alloc_type_
         desc->lock = sys_mutex();
         desc->stats = NEW(dx_alloc_stats_t);
         memset(desc->stats, 0, sizeof(dx_alloc_stats_t));
-    }
 
-    item_t *type_item = NEW(item_t);
-    DEQ_ITEM_INIT(type_item);
-    type_item->desc = desc;
-    DEQ_INSERT_TAIL(type_list, type_item);
+        dx_alloc_type_t *type_item = NEW(dx_alloc_type_t);
+        DEQ_ITEM_INIT(type_item);
+        type_item->desc = desc;
+        DEQ_INSERT_TAIL(type_list, type_item);
+    }
 
     sys_mutex_unlock(init_lock);
 }
@@ -103,7 +113,7 @@ void *dx_alloc(dx_alloc_type_desc_t *des
     // list and return it.  Since everything we've touched is thread-local,
     // there is no need to acquire a lock.
     //
-    item_t *item = DEQ_HEAD(pool->free_list);
+    dx_alloc_item_t *item = DEQ_HEAD(pool->free_list);
     if (item) {
         DEQ_REMOVE_HEAD(pool->free_list);
         return &item[1];
@@ -130,11 +140,10 @@ void *dx_alloc(dx_alloc_type_desc_t *des
         // Allocate a full batch from the heap and put it on the thread list.
         //
         for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
-            item = (item_t*) malloc(sizeof(item_t) + desc->total_size);
+            item = (dx_alloc_item_t*) malloc(sizeof(dx_alloc_item_t) + desc->total_size);
             if (item == 0)
                 break;
             DEQ_ITEM_INIT(item);
-            item->desc = desc;
             DEQ_INSERT_TAIL(pool->free_list, item);
             desc->stats->held_by_threads++;
             desc->stats->total_alloc_from_heap++;
@@ -154,7 +163,7 @@ void *dx_alloc(dx_alloc_type_desc_t *des
 
 void dx_dealloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool, void *p)
 {
-    item_t          *item = ((item_t*) p) - 1;
+    dx_alloc_item_t *item = ((dx_alloc_item_t*) p) - 1;
     int              idx;
 
     //
@@ -217,7 +226,7 @@ static void alloc_schema_handler(void *c
 
 static void alloc_query_handler(void* context, const char *id, void *cor)
 {
-    item_t *item = DEQ_HEAD(type_list);
+    dx_alloc_type_t *item = DEQ_HEAD(type_list);
 
     while (item) {
         dx_agent_value_string(cor, "name", item->desc->type_name);

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/alloc_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/alloc_private.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/alloc_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/alloc_private.h Fri Sep 20 18:59:30 2013
@@ -8,9 +8,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,30 +21,10 @@
 #include <qpid/dispatch/alloc.h>
 #include <qpid/dispatch/buffer.h>
 #include <qpid/dispatch/amqp.h>
-#include "message_private.h"
 #include "compose_private.h"
 #include <memory.h>
 
-typedef struct dx_composite_t {
-    DEQ_LINKS(struct dx_composite_t);
-    int                 isMap;
-    uint32_t            count;
-    uint32_t            length;
-    dx_field_location_t length_location;
-    dx_field_location_t count_location;
-} dx_composite_t;
-
-ALLOC_DECLARE(dx_composite_t);
 ALLOC_DEFINE(dx_composite_t);
-DEQ_DECLARE(dx_composite_t, dx_field_stack_t);
-
-
-struct dx_composed_field_t {
-    dx_buffer_list_t buffers;
-    dx_field_stack_t fieldStack;
-};
-
-ALLOC_DECLARE(dx_composed_field_t);
 ALLOC_DEFINE(dx_composed_field_t);
 
 
@@ -197,7 +177,7 @@ static void dx_compose_end_composite(dx_
     //
     dx_composite_t *enclosing = DEQ_HEAD(field->fieldStack);
     if (enclosing) {
-        enclosing->length += 4 + comp->length;
+        enclosing->length += (comp->length - 4); // the length and count were already accounted for
         enclosing->count++;
     }
 
@@ -233,12 +213,14 @@ void dx_compose_free(dx_composed_field_t
     while (buf) {
         DEQ_REMOVE_HEAD(field->buffers);
         dx_free_buffer(buf);
+        buf = DEQ_HEAD(field->buffers);
     }
 
     dx_composite_t *comp = DEQ_HEAD(field->fieldStack);
     while (comp) {
         DEQ_REMOVE_HEAD(field->fieldStack);
         free_dx_composite_t(comp);
+        comp = DEQ_HEAD(field->fieldStack);
     }
 
     free_dx_composed_field_t(field);

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/compose_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/compose_private.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/compose_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/compose_private.h Fri Sep 20 18:59:30 2013
@@ -8,9 +8,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,7 +20,28 @@
  */
 
 #include <qpid/dispatch/compose.h>
+#include "message_private.h"
 
 dx_buffer_list_t *dx_compose_buffers(dx_composed_field_t *field);
 
+typedef struct dx_composite_t {
+    DEQ_LINKS(struct dx_composite_t);
+    int                 isMap;
+    uint32_t            count;
+    uint32_t            length;
+    dx_field_location_t length_location;
+    dx_field_location_t count_location;
+} dx_composite_t;
+
+ALLOC_DECLARE(dx_composite_t);
+DEQ_DECLARE(dx_composite_t, dx_field_stack_t);
+
+
+struct dx_composed_field_t {
+    dx_buffer_list_t buffers;
+    dx_field_stack_t fieldStack;
+};
+
+ALLOC_DECLARE(dx_composed_field_t);
+
 #endif

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/config.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/config.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/config.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/config.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,7 +22,7 @@
 #include <qpid/dispatch/alloc.h>
 #include <qpid/dispatch/log.h>
 
-#define PYTHON_MODULE "config"
+#define PYTHON_MODULE "qpid.dispatch.config"
 
 static const char *log_module = "CONFIG";
 

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/config_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/config_private.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/config_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/config_private.h Fri Sep 20 18:59:30 2013
@@ -8,9 +8,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/container.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/container.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/container.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/container.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -47,6 +47,7 @@ ALLOC_DECLARE(dx_node_t);
 ALLOC_DEFINE(dx_node_t);
 ALLOC_DEFINE(dx_link_item_t);
 
+
 struct dx_link_t {
     pn_link_t *pn_link;
     void      *context;
@@ -56,6 +57,19 @@ struct dx_link_t {
 ALLOC_DECLARE(dx_link_t);
 ALLOC_DEFINE(dx_link_t);
 
+
+struct dx_delivery_t {
+    pn_delivery_t *pn_delivery;
+    dx_delivery_t *peer;
+    void          *context;
+    uint64_t       disposition;
+    dx_link_t     *link;
+};
+
+ALLOC_DECLARE(dx_delivery_t);
+ALLOC_DEFINE(dx_delivery_t);
+
+
 typedef struct dxc_node_type_t {
     DEQ_LINKS(struct dxc_node_type_t);
     const dx_node_type_t *ntype;
@@ -180,14 +194,25 @@ static int do_writable(pn_link_t *pn_lin
 }
 
 
-static void process_receive(pn_delivery_t *delivery)
+static void do_receive(pn_delivery_t *pnd)
 {
-    pn_link_t *pn_link = pn_delivery_link(delivery);
-    dx_link_t *link    = (dx_link_t*) pn_link_get_context(pn_link);
+    pn_link_t     *pn_link  = pn_delivery_link(pnd);
+    dx_link_t     *link     = (dx_link_t*) pn_link_get_context(pn_link);
+    dx_delivery_t *delivery = (dx_delivery_t*) pn_delivery_get_context(pnd);
 
     if (link) {
         dx_node_t *node = link->node;
         if (node) {
+            if (!delivery) {
+                delivery = new_dx_delivery_t();
+                delivery->pn_delivery = pnd;
+                delivery->peer        = 0;
+                delivery->context     = 0;
+                delivery->disposition = 0;
+                delivery->link        = link;
+                pn_delivery_set_context(pnd, delivery);
+            }
+
             node->ntype->rx_handler(node->context, link, delivery);
             return;
         }
@@ -198,34 +223,18 @@ static void process_receive(pn_delivery_
     //
     pn_link_advance(pn_link);
     pn_link_flow(pn_link, 1);
-    pn_delivery_update(delivery, PN_REJECTED);
-    pn_delivery_settle(delivery);
-}
-
-
-static void do_send(pn_delivery_t *delivery)
-{
-    pn_link_t *pn_link = pn_delivery_link(delivery);
-    dx_link_t *link    = (dx_link_t*) pn_link_get_context(pn_link);
-
-    if (link) {
-        dx_node_t *node = link->node;
-        if (node) {
-            node->ntype->tx_handler(node->context, link, delivery);
-            return;
-        }
-    }
-
-    // TODO - Cancel the delivery
+    pn_delivery_update(pnd, PN_REJECTED);
+    pn_delivery_settle(pnd);
 }
 
 
-static void do_updated(pn_delivery_t *delivery)
+static void do_updated(pn_delivery_t *pnd)
 {
-    pn_link_t *pn_link = pn_delivery_link(delivery);
-    dx_link_t *link    = (dx_link_t*) pn_link_get_context(pn_link);
+    pn_link_t     *pn_link  = pn_delivery_link(pnd);
+    dx_link_t     *link     = (dx_link_t*) pn_link_get_context(pn_link);
+    dx_delivery_t *delivery = (dx_delivery_t*) pn_delivery_get_context(pnd);
 
-    if (link) {
+    if (link && delivery) {
         dx_node_t *node = link->node;
         if (node)
             node->ntype->disp_handler(node->context, link, delivery);
@@ -239,15 +248,15 @@ static int close_handler(void* unused, p
     // Close all links, passing False as the 'closed' argument.  These links are not
     // being properly 'detached'.  They are being orphaned.
     //
-    pn_link_t *pn_link = pn_link_head(conn, 0);
+    pn_link_t *pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE);
     while (pn_link) {
         dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
         dx_node_t *node = link->node;
-        if (node)
+        if (node && link)
             node->ntype->link_detach_handler(node->context, link, 0);
         pn_link_close(pn_link);
         free_dx_link_t(link);
-        pn_link = pn_link_next(pn_link, 0);
+        pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE);
     }
 
     // teardown all sessions
@@ -305,9 +314,7 @@ static int process_handler(dx_container_
     delivery = pn_work_head(conn);
     while (delivery) {
         if (pn_delivery_readable(delivery))
-            process_receive(delivery);
-        else if (pn_delivery_writable(delivery))
-            do_send(delivery);
+            do_receive(delivery);
 
         if (pn_delivery_updated(delivery)) {
             do_updated(delivery);
@@ -318,15 +325,15 @@ static int process_handler(dx_container_
     }
 
     //
-    // Step 2.5: Traverse all of the links on the connection looking for
-    // outgoing links with non-zero credit.  Call the attached node's
-    // writable handler for such links.
+    // Step 2.5: Call the attached node's writable handler for all active links
+    // on the connection.  Note that in Dispatch, links are considered
+    // bidirectional.  Incoming and outgoing only pertains to deliveries and
+    // deliveries are a subset of the traffic that flows both directions on links.
     //
     pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
     while (pn_link) {
         assert(pn_session_connection(pn_link_session(pn_link)) == conn);
-        if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0)
-            event_count += do_writable(pn_link);
+        event_count += do_writable(pn_link);
         pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
     }
 
@@ -513,10 +520,10 @@ int dx_container_register_node_type(dx_d
 }
 
 
-void dx_container_set_default_node_type(dx_dispatch_t        *dx,
-                                        const dx_node_type_t *nt,
-                                        void                 *context,
-                                        dx_dist_mode_t        supported_dist)
+dx_node_t *dx_container_set_default_node_type(dx_dispatch_t        *dx,
+                                              const dx_node_type_t *nt,
+                                              void                 *context,
+                                              dx_dist_mode_t        supported_dist)
 {
     dx_container_t *container = dx->container;
 
@@ -530,6 +537,8 @@ void dx_container_set_default_node_type(
         container->default_node = 0;
         dx_log(module, LOG_TRACE, "Default node removed");
     }
+
+    return container->default_node;
 }
 
 
@@ -699,3 +708,110 @@ void dx_link_close(dx_link_t *link)
 }
 
 
+dx_delivery_t *dx_delivery(dx_link_t *link, pn_delivery_tag_t tag)
+{
+    pn_link_t *pnl = dx_link_pn(link);
+
+    //
+    // If there is a current delivery on this outgoing link, something
+    // is wrong with the delivey algorithm.  We assume that the current
+    // delivery ('pnd' below) is the one created by pn_delivery.  If it is
+    // not, then my understanding of how proton works is incorrect.
+    //
+    assert(!pn_link_current(pnl));
+
+    pn_delivery(pnl, tag);
+    pn_delivery_t *pnd = pn_link_current(pnl);
+
+    if (!pnd)
+        return 0;
+
+    dx_delivery_t *delivery = new_dx_delivery_t();
+    delivery->pn_delivery = pnd;
+    delivery->peer        = 0;
+    delivery->context     = 0;
+    delivery->disposition = 0;
+    delivery->link        = link;
+    pn_delivery_set_context(pnd, delivery);
+
+    return delivery;
+}
+
+
+void dx_delivery_free(dx_delivery_t *delivery, uint64_t final_disposition)
+{
+    if (delivery->pn_delivery) {
+        if (final_disposition > 0)
+            pn_delivery_update(delivery->pn_delivery, final_disposition);
+        pn_delivery_set_context(delivery->pn_delivery, 0);
+        pn_delivery_settle(delivery->pn_delivery);
+    }
+    if (delivery->peer)
+        delivery->peer->peer = 0;
+    free_dx_delivery_t(delivery);
+}
+
+
+void dx_delivery_set_peer(dx_delivery_t *delivery, dx_delivery_t *peer)
+{
+    delivery->peer = peer;
+}
+
+
+void dx_delivery_set_context(dx_delivery_t *delivery, void *context)
+{
+    delivery->context = context;
+}
+
+
+void *dx_delivery_context(dx_delivery_t *delivery)
+{
+    return delivery->context;
+}
+
+
+dx_delivery_t *dx_delivery_peer(dx_delivery_t *delivery)
+{
+    return delivery->peer;
+}
+
+
+pn_delivery_t *dx_delivery_pn(dx_delivery_t *delivery)
+{
+    return delivery->pn_delivery;
+}
+
+
+void dx_delivery_settle(dx_delivery_t *delivery)
+{
+    if (delivery->pn_delivery) {
+        pn_delivery_settle(delivery->pn_delivery);
+        delivery->pn_delivery = 0;
+    }
+}
+
+
+bool dx_delivery_settled(dx_delivery_t *delivery)
+{
+    return pn_delivery_settled(delivery->pn_delivery);
+}
+
+
+bool dx_delivery_disp_changed(dx_delivery_t *delivery)
+{
+    return delivery->disposition != pn_delivery_remote_state(delivery->pn_delivery);
+}
+
+
+uint64_t dx_delivery_disp(dx_delivery_t *delivery)
+{
+    delivery->disposition = pn_delivery_remote_state(delivery->pn_delivery);
+    return delivery->disposition;
+}
+
+
+dx_link_t *dx_delivery_link(dx_delivery_t *delivery)
+{
+    return delivery->link;
+}
+

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch_private.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch_private.h Fri Sep 20 18:59:30 2013
@@ -8,9 +8,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,7 +38,7 @@ DEQ_DECLARE(hash_item_t, items_t);
 
 
 typedef struct bucket_t {
-    items_t items;    
+    items_t items;
 } bucket_t;
 
 

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/iovec.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/iovec.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/iovec.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/iovec.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/iterator.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/iterator.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/iterator.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/iterator.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -367,7 +367,7 @@ dx_field_iterator_t *dx_field_iterator_s
 void dx_field_iterator_advance(dx_field_iterator_t *iter, uint32_t length)
 {
     // TODO - Make this more efficient.
-    for (uint8_t idx = 0; idx < length; idx++)
+    for (uint8_t idx = 0; idx < length && !dx_field_iterator_end(iter); idx++)
         dx_field_iterator_octet(iter);
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org