You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/09/18 15:27:43 UTC

[qpid-dispatch] branch master updated: DISPATCH-1406: Add system test for the DISPATCH-1406 issue.

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 759dfd6  DISPATCH-1406: Add system test for the DISPATCH-1406 issue.
759dfd6 is described below

commit 759dfd6be57c2d32a371b1529194831356de61c5
Author: Kenneth Giusti <kg...@redhat.com>
AuthorDate: Fri Sep 6 13:54:42 2019 -0400

    DISPATCH-1406: Add system test for the DISPATCH-1406 issue.
    
    This closes #564
---
 tests/CMakeLists.txt              |   8 +
 tests/system_test.py              | 104 ++++++++-
 tests/system_tests_edge_router.py |   2 +-
 tests/system_tests_link_routes.py |   4 +-
 tests/system_tests_router_mesh.py | 215 ++++++++++++++++++
 tests/test-receiver.c             | 233 ++++++++++++++++++++
 tests/test-sender.c               | 450 ++++++++++++++++++++++++++++++++++++++
 7 files changed, 1001 insertions(+), 15 deletions(-)

diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 2fcff52..7b2d03b 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -52,6 +52,13 @@ set(unit_test_size_SOURCES
 add_executable(unit_tests_size ${unit_test_size_SOURCES})
 target_link_libraries(unit_tests_size qpid-dispatch)
 
+add_executable(test-sender test-sender.c)
+target_link_libraries(test-sender ${Proton_LIBRARIES})
+
+add_executable(test-receiver test-receiver.c)
+target_link_libraries(test-receiver ${Proton_LIBRARIES})
+
+
 set(TEST_WRAP ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/run.py)
 
 add_test(unit_tests_size_10000 ${TEST_WRAP} unit_tests_size 10000)
@@ -127,6 +134,7 @@ foreach(py_test_module
     system_tests_multi_phase
     system_tests_multicast
     system_tests_fallback_dest
+    system_tests_router_mesh
     )
 
   add_test(${py_test_module} ${TEST_WRAP} unit2 -v ${py_test_module})
diff --git a/tests/system_test.py b/tests/system_test.py
index f099560..366f63f 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -47,12 +47,15 @@ import uuid
 
 import proton
 from proton import Message
+from proton import Delivery
 from proton.handlers import MessagingHandler
 from proton.utils import BlockingConnection
 from proton.reactor import AtLeastOnce, Container
 from qpid_dispatch.management.client import Node
 from qpid_dispatch_internal.compat import dict_iteritems
 
+is_python2 = sys.version_info[0] == 2
+
 # Optional modules
 MISSING_MODULES = []
 
@@ -268,6 +271,39 @@ class Process(subprocess.Popen):
         if self.expect != None and self.expect != status:
             error("exit code %s, expected %s" % (status, self.expect))
 
+    def wait(self, timeout=None):
+        """
+        Add support for a timeout when using Python 2
+        """
+        if timeout is None:
+            return super(Process, self).wait()
+
+        if is_python2:
+            start = time.time()
+            while True:
+                rc = super(Process, self).poll()
+                if rc is not None:
+                    return rc
+                if time.time() - start >= timeout:
+                    raise Exception("Process did not terminate")
+                time.sleep(0.1)
+        else:
+            return super(Process, self).wait(timeout=timeout)
+
+    def communicate(self, input=None, timeout=None):
+        """
+        Add support for a timeout when using Python 2
+        """
+        if timeout is None:
+            return super(Process, self).communicate(input=input)
+
+        if is_python2:
+            self.wait(timeout=timeout)
+            return super(Process, self).communicate(input=input)
+
+        return super(Process, self).communicate(input=input,
+                                                timeout=timeout)
+
 
 class Config(object):
     """Base class for configuration objects that provide a convenient
@@ -798,13 +834,24 @@ class AsyncTestSender(MessagingHandler):
     A simple sender that runs in the background and sends 'count' messages to a
     given target.
     """
-    def __init__(self, address, target, count=1, body=None, container_id=None):
-        super(AsyncTestSender, self).__init__()
+    class TestSenderException(Exception):
+        def __init__(self, error=None):
+            super(AsyncTestSender.TestSenderException, self).__init__(error)
+
+    def __init__(self, address, target, count=1, message=None, container_id=None):
+        super(AsyncTestSender, self).__init__(auto_accept=False,
+                                              auto_settle=False)
         self.address = address
         self.target = target
-        self.count = count
-        self._unaccepted = count
-        self._body = body or "test"
+        self.total = count
+        self.accepted = 0
+        self.released = 0
+        self.modified = 0
+        self.rejected = 0
+        self.sent = 0
+        self.error = None
+
+        self._message = message or Message(body="test")
         self._container = Container(self)
         cid = container_id or "ATS-%s:%s" % (target, uuid.uuid4())
         self._container.container_id = cid
@@ -821,6 +868,8 @@ class AsyncTestSender(MessagingHandler):
         # don't stop it - wait until everything is sent
         self._thread.join(timeout=TIMEOUT)
         assert not self._thread.is_alive(), "sender did not complete"
+        if self.error:
+            raise AsyncTestSender.TestSenderException(self.error)
 
     def on_start(self, event):
         self._conn = self._container.connect(self.address)
@@ -831,16 +880,47 @@ class AsyncTestSender(MessagingHandler):
                                                      options=AtLeastOnce())
 
     def on_sendable(self, event):
-        if self.count:
-            self._sender.send(Message(body=self._body))
-            self.count -= 1
-
-    def on_accepted(self, event):
-        self._unaccepted -= 1;
-        if self._unaccepted == 0:
+        if self.sent < self.total:
+            self._sender.send(self._message)
+            self.sent += 1
+
+    def _check_if_done(self):
+        done = (self.sent == self.total
+                and (self.accepted + self.released + self.modified
+                     + self.rejected) == self.sent)
+        if done:
             self._conn.close()
             self._conn = None
 
+    def on_accepted(self, event):
+        self.accepted += 1;
+        event.delivery.settle()
+        self._check_if_done()
+
+    def on_released(self, event):
+        # for some reason Proton 'helpfully' calls on_released even though the
+        # delivery state is actually MODIFIED
+        if event.delivery.remote_state == Delivery.MODIFIED:
+            return self.on_modified(event)
+        self.released += 1
+        event.delivery.settle()
+        self._check_if_done()
+
+    def on_modified(self, event):
+        self.modified += 1
+        event.delivery.settle()
+        self._check_if_done()
+
+    def on_rejected(self, event):
+        self.rejected += 1
+        event.delivery.settle()
+        self._check_if_done()
+
+    def on_link_error(self, event):
+        self.error = "link error:%s" % str(event.link.remote_condition)
+        self._conn.close()
+        self._conn = None
+
 
 class QdManager(object):
     """
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index e98c6ef..ee89b02 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -1503,7 +1503,7 @@ class LinkRouteProxyTest(TestCase):
         rx = AsyncTestReceiver(self.EB1.listener, 'CfgLinkRoute1/foo',
                                wait=False, recover_link=True)
         tx = AsyncTestSender(self.EA1.listener, 'CfgLinkRoute1/foo',
-                             body="HEY HO LET'S GO!")
+                             message=Message(body="HEY HO LET'S GO!"))
         tx.wait()
 
         msg = rx.queue.get(timeout=TIMEOUT)
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 1c59d9a..33658b4 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -1820,7 +1820,7 @@ class ConnectionLinkRouteTest(TestCase):
         s = AsyncTestSender(self.QDR_A.addresses[0],
                             "flea.B",
                             container_id="flea.BSender",
-                            body="SENDING TO flea.B",
+                            message=Message(body="SENDING TO flea.B"),
                             count=COUNT)
         s.wait()   # for sender to complete
         for i in range(COUNT):
@@ -1836,7 +1836,7 @@ class ConnectionLinkRouteTest(TestCase):
         s = AsyncTestSender(self.QDR_B.addresses[0],
                             "flea.A",
                             container_id="flea.ASender",
-                            body="SENDING TO flea.A",
+                            message=Message(body="SENDING TO flea.A"),
                             count=COUNT)
         s.wait()
         for i in range(COUNT):
diff --git a/tests/system_tests_router_mesh.py b/tests/system_tests_router_mesh.py
new file mode 100644
index 0000000..fad0600
--- /dev/null
+++ b/tests/system_tests_router_mesh.py
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+import os
+import sys
+from time import sleep
+import unittest2 as unittest
+from signal import SIGINT
+from subprocess import PIPE
+
+from proton import Message
+
+from system_test import TestCase
+from system_test import Qdrouterd
+from system_test import QdManager
+from system_test import main_module
+from system_test import TIMEOUT
+from system_test import Process
+from system_test import AsyncTestSender
+
+
+class ThreeRouterTest(TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        """
+        Create a mesh of three routers.  Reject any links or messages sent to
+        an unavailable address.
+        """
+        super(ThreeRouterTest, cls).setUpClass()
+
+        def router(name, extra_config):
+            config = [
+
+                ('router', {'id': name,
+                            'mode': 'interior',
+                            "defaultDistribution": "unavailable"}),
+                ('listener', {'role': 'normal',
+                              'port': cls.tester.get_port(),
+                              "linkCapacity": '100'}),
+
+                ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+                ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
+                ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+            ] + extra_config
+
+            config = Qdrouterd.Config(config)
+
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
+
+        cls.routers = []
+
+        inter_router_A = cls.tester.get_port()
+        inter_router_B = cls.tester.get_port()
+        inter_router_C = cls.tester.get_port()
+
+        router('RouterA',
+               [('listener', {'role': 'inter-router', 'port': inter_router_A}),
+                ('connector', {'role': 'inter-router', 'port': inter_router_B})])
+
+        router('RouterB',
+               [('listener', {'role': 'inter-router', 'port': inter_router_B}),
+                ('connector', {'role': 'inter-router', 'port': inter_router_C})])
+
+        router('RouterC',
+               [('listener', {'role': 'inter-router', 'port': inter_router_C}),
+                ('connector', {'role': 'inter-router', 'port': inter_router_A})])
+
+        cls.RouterA = cls.routers[0]
+        cls.RouterB = cls.routers[1]
+        cls.RouterC = cls.routers[2]
+
+        cls.RouterA.wait_router_connected('RouterB')
+        cls.RouterA.wait_router_connected('RouterC')
+        cls.RouterB.wait_router_connected('RouterA')
+        cls.RouterB.wait_router_connected('RouterC')
+        cls.RouterC.wait_router_connected('RouterA')
+        cls.RouterC.wait_router_connected('RouterB')
+
+    def server_address(self, router):
+        return router.addresses[0]
+
+    def server_port(self, router):
+        return router.ports[0]  # first listener is for client connection
+
+    def server_host(self, router):
+        fam = router.ports_family
+        return router.get_host(fam.get(self.server_port(router),
+                                       "IPv4"))
+
+    def spawn_receiver(self, router, count, address, *extra_args):
+        cmd = ["test-receiver",
+               "-a", "%s:%s" % (self.server_host(router),
+                                self.server_port(router)),
+               "-c", str(count), "-s", address] + list(extra_args)
+        # env = dict(os.environ, PN_TRACE_FRM="1")
+        # return self.popen(cmd, expect=Process.EXIT_OK, env=env)
+        return self.popen(cmd, expect=Process.EXIT_OK)
+
+    def spawn_sender(self, router, count, address, *extra_args):
+        cmd = ["test-sender",
+               "-a", "%s:%s" % (self.server_host(router),
+                                self.server_port(router)),
+               "-c", str(count), "-t", address] + list(extra_args)
+        # env = dict(os.environ, PN_TRACE_FRM="1")
+        # return self.popen(cmd, expect=Process.EXIT_OK, env=env)
+        return self.popen(cmd, expect=Process.EXIT_OK)
+
+    def _rx_failover(self, extra_tx_args=None, extra_rx_args=None):
+        # Have a single sender transmit unsettled as fast as possible
+        # non-stop.  Have a single receiver that consumes a small number of
+        # messages before failing over to a different router in the mesh
+        extra_tx = extra_tx_args or []
+        extra_rx = extra_rx_args or []
+        total = 100
+        router_index = 0
+        tx = self.spawn_sender(self.RouterC, 0, "balanced/foo", *extra_tx)
+        while total > 0:
+            rx = self.spawn_receiver(self.routers[router_index], 5,
+                                     "balanced/foo", *extra_rx)
+            if rx.wait(timeout=TIMEOUT):
+                raise Exception("Receiver failed to consume all messages")
+            total -= 5
+            router_index += 1
+            if router_index == len(self.routers):
+                router_index = 0
+        tx.send_signal(SIGINT)
+        out_text, out_err = tx.communicate(timeout=TIMEOUT)
+        if tx.returncode:
+            raise Exception("Sender failed: %s %s"
+                            % (out_text, out_err))
+
+    def test_01_rx_failover_clean(self):
+        """
+        Runs the receiver failover test.  In this test the receiver cleanly
+        shuts down the AMQP endpoint before failing over.
+        """
+        self._rx_failover()
+
+
+    def test_02_rx_failover_dirty(self):
+        """
+        Runs the receiver failover test.  In this test the receiver abruptly
+        drops the TCP connection simulating a client crash.
+        """
+        tcp_drop = ["-E"]
+        self._rx_failover(extra_rx_args=tcp_drop)
+
+    def test_03_unavailable_link_attach(self):
+        """
+        Attempt to attach a link to an unavailable address, expect the router
+        to detach it
+        """
+        ats = AsyncTestSender(self.server_address(self.RouterA),
+                              "an/unavailable/address")
+        try:
+            ats.wait()
+            self.assertTrue(False)  # expect exception
+        except AsyncTestSender.TestSenderException as exc:
+            self.assertTrue("link error" in ats.error)
+
+    def test_04_unavailable_anonymous_link_attach(self):
+        """
+        Attempt to attach an anonymous link and send a message to an
+        unavailable address.  Expect to allow the link, but REJECT the message
+        """
+        message = Message(body="REJECTED!!!")
+        message.address = "another/unavailable/address"
+        ats = AsyncTestSender(self.server_address(self.RouterA),
+                              target=None,
+                              message=message)
+        ats.wait()
+        self.assertEqual(0, ats.accepted)
+        self.assertEqual(1, ats.rejected)
+
+    def test_05_unavailable_anonymous_link_send(self):
+        """
+        Attach an anonymous link and send to a configured address (no
+        subscribers).  Expect to allow the link, but RELEASE the message
+        """
+        message = Message(body="Release me, let me go...")
+        message.address = "closest/foo"
+        ats = AsyncTestSender(self.server_address(self.RouterA),
+                              target=None,
+                              message=message)
+        ats.wait()
+        self.assertEqual(0, ats.accepted)
+        # BUG DISPATCH-1418: shold be released!
+        # self.assertEqual(1, ats.released)
+        self.assertEqual(1, ats.rejected)
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())
diff --git a/tests/test-receiver.c b/tests/test-receiver.c
new file mode 100644
index 0000000..8ae189c
--- /dev/null
+++ b/tests/test-receiver.c
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <time.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <math.h>
+
+#include "proton/reactor.h"
+#include "proton/message.h"
+#include "proton/connection.h"
+#include "proton/session.h"
+#include "proton/link.h"
+#include "proton/delivery.h"
+#include "proton/event.h"
+#include "proton/handlers.h"
+
+
+#define MAX_SIZE (1024 * 64)
+char in_buffer[MAX_SIZE];
+
+bool stop = false;
+
+int  credit_window = 1000;
+char *source_address = "test-address";  // name of the source node to receive from
+char _addr[] = "127.0.0.1:5672";
+char *host_address = _addr;
+char *container_name = "TestReceiver";
+bool drop_connection = false;
+
+pn_connection_t *pn_conn;
+pn_session_t *pn_ssn;
+pn_link_t *pn_link;
+pn_reactor_t *reactor;
+pn_message_t *in_message;       // holds the current received message
+
+uint64_t count = 0;
+uint64_t limit = 0;   // if > 0 stop after limit messages arrive
+
+
+static void signal_handler(int signum)
+{
+    signal(SIGINT,  SIG_IGN);
+    signal(SIGQUIT, SIG_IGN);
+
+    switch (signum) {
+    case SIGINT:
+    case SIGQUIT:
+        stop = true;
+        break;
+    default:
+        break;
+    }
+}
+
+
+// Called when reactor exits to clean up app_data
+//
+static void delete_handler(pn_handler_t *handler)
+{
+    if (in_message) {
+        pn_message_free(in_message);
+        in_message = NULL;
+    }
+}
+
+
+/* Process each event posted by the reactor.
+ */
+static void event_handler(pn_handler_t *handler,
+                          pn_event_t *event,
+                          pn_event_type_t type)
+{
+    switch (type) {
+
+    case PN_CONNECTION_INIT: {
+        // Create and open all the endpoints needed to send a message
+        //
+        in_message = pn_message();
+        pn_connection_open(pn_conn);
+        pn_ssn = pn_session(pn_conn);
+        pn_session_open(pn_ssn);
+        pn_link = pn_receiver(pn_ssn, "MyReceiver");
+        pn_terminus_set_address(pn_link_source(pn_link), source_address);
+        pn_link_open(pn_link);
+        // cannot receive without granting credit:
+        pn_link_flow(pn_link, credit_window);
+    } break;
+
+    case PN_DELIVERY: {
+        // A message has been received
+        //
+        if (stop) break;  // silently discard any further messages
+
+        pn_delivery_t *dlv = pn_event_delivery(event);
+        if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
+            // A full message has arrived
+            count += 1;
+            pn_delivery_update(dlv, PN_ACCEPTED);
+            pn_delivery_settle(dlv);  // dlv is now freed
+
+            if (pn_link_credit(pn_link) <= credit_window/2) {
+                // Grant enough credit to bring it up to CAPACITY:
+                pn_link_flow(pn_link, credit_window - pn_link_credit(pn_link));
+            }
+
+            if (limit && count == limit) {
+                stop = true;
+                pn_reactor_wakeup(reactor);
+            }
+        }
+    } break;
+
+    default:
+        break;
+    }
+}
+
+static void usage(void)
+{
+    printf("Usage: receiver <options>\n");
+    printf("-a      \tThe address:port of the server [%s]\n", host_address);
+    printf("-c      \tExit after N messages arrive (0 == run forever) [%"PRIu64"]\n", limit);
+    printf("-i      \tContainer name [%s]\n", container_name);
+    printf("-s      \tSource address [%s]\n", source_address);
+    printf("-w      \tCredit window [%d]\n", credit_window);
+    printf("-E      \tExit without cleanly closing the connection [off]\n");
+    exit(1);
+}
+
+
+int main(int argc, char** argv)
+{
+    /* create a handler for the connection's events.
+     */
+    pn_handler_t *handler = pn_handler_new(event_handler, 0, delete_handler);
+    pn_handler_add(handler, pn_handshaker());
+
+    /* command line options */
+    opterr = 0;
+    int c;
+    while((c = getopt(argc, argv, "i:a:s:hw:c:E")) != -1) {
+        switch(c) {
+        case 'h': usage(); break;
+        case 'a': host_address = optarg; break;
+        case 'c':
+            if (sscanf(optarg, "%"PRIu64, &limit) != 1)
+                usage();
+            break;
+        case 'i': container_name = optarg; break;
+        case 's': source_address = optarg; break;
+        case 'w':
+            if (sscanf(optarg, "%d", &credit_window) != 1 || credit_window <= 0)
+                usage();
+            break;
+        case 'E': drop_connection = true;  break;
+
+        default:
+            usage();
+            break;
+        }
+    }
+
+    signal(SIGQUIT, signal_handler);
+    signal(SIGINT,  signal_handler);
+
+    char *host = host_address;
+    if (strncmp(host, "amqp://", 7) == 0)
+        host += 7;
+    char *port = strrchr(host, ':');
+    if (port) {
+        *port++ = 0;
+    } else {
+        port = "5672";
+    }
+
+    reactor = pn_reactor();
+    pn_conn = pn_reactor_connection_to_host(reactor,
+                                            host,
+                                            port,
+                                            handler);
+
+    // the container name should be unique for each client
+    pn_connection_set_container(pn_conn, container_name);
+    pn_connection_set_hostname(pn_conn, host);
+
+    // periodic wakeup
+    pn_reactor_set_timeout(reactor, 1000);
+
+    pn_reactor_start(reactor);
+
+    while (pn_reactor_process(reactor)) {
+        if (stop) {
+            if (drop_connection)  // hard exit
+                exit(0);
+            // close the endpoints this will cause pn_reactor_process() to
+            // eventually break the loop
+            if (pn_link) pn_link_close(pn_link);
+            if (pn_ssn) pn_session_close(pn_ssn);
+            if (pn_conn) pn_connection_close(pn_conn);
+        }
+    }
+
+    if (pn_link) pn_link_free(pn_link);
+    if (pn_ssn) pn_session_free(pn_ssn);
+    if (pn_conn) pn_connection_close(pn_conn);
+
+    pn_reactor_free(reactor);
+
+    return 0;
+}
diff --git a/tests/test-sender.c b/tests/test-sender.c
new file mode 100644
index 0000000..92e182a
--- /dev/null
+++ b/tests/test-sender.c
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#define ADD_ANNOTATIONS 1
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <time.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <math.h>
+
+
+#include "proton/reactor.h"
+#include "proton/message.h"
+#include "proton/connection.h"
+#include "proton/session.h"
+#include "proton/link.h"
+#include "proton/delivery.h"
+#include "proton/event.h"
+#include "proton/handlers.h"
+
+#define BOOL2STR(b) ((b)?"true":"false")
+
+#define BODY_SIZE_SMALL  100
+#define BODY_SIZE_MEDIUM 2000
+#define BODY_SIZE_LARGE  60000  // NOTE: receiver.c max in buffer size = 64KB
+
+// body data - block of 0's
+//
+char _payload[BODY_SIZE_LARGE] = {0};
+pn_bytes_t body_data = {
+    .size  = 0,
+    .start = _payload,
+};
+
+bool stop = false;
+
+uint64_t limit = 1;               // # messages to send
+uint64_t count = 0;               // # sent
+uint64_t acked = 0;               // # of received acks
+
+// outcome counts
+uint64_t accepted = 0;
+uint64_t rejected = 0;
+uint64_t modified = 0;
+uint64_t released = 0;
+
+
+bool use_anonymous = false;       // use anonymous link if true
+bool presettle = false;           // true = send presettled
+bool add_annotations = false;
+int body_size = BODY_SIZE_SMALL;
+bool drop_connection = false;
+
+// buffer for encoded message
+char *encode_buffer = NULL;
+size_t encode_buffer_size = 0;    // size of malloced memory
+size_t encoded_data_size = 0;     // length of encoded content
+
+
+char *target_address = "test-address";
+char _addr[] = "127.0.0.1:5672";
+char *host_address = _addr;
+char *container_name = "TestSender";
+
+pn_connection_t *pn_conn;
+pn_session_t *pn_ssn;
+pn_link_t *pn_link;
+pn_reactor_t *reactor;
+pn_message_t *out_message;
+
+
+// odd-length long string
+const char big_string[] =
+    "+"
+    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";
+
+
+static void add_message_annotations(pn_message_t *out_message)
+{
+    // just a bunch of dummy MA
+    pn_data_t *annos = pn_message_annotations(out_message);
+    pn_data_clear(annos);
+    pn_data_put_map(annos);
+    pn_data_enter(annos);
+
+    pn_data_put_symbol(annos, pn_bytes(strlen("my-key"), "my-key"));
+    pn_data_put_string(annos, pn_bytes(strlen("my-data"), "my-data"));
+
+    pn_data_put_symbol(annos, pn_bytes(strlen("my-other-key"), "my-other-key"));
+    pn_data_put_string(annos, pn_bytes(strlen("my-other-data"), "my-other-data"));
+
+    // embedded map
+    pn_data_put_symbol(annos, pn_bytes(strlen("my-map"), "my-map"));
+    pn_data_put_map(annos);
+    pn_data_enter(annos);
+    pn_data_put_symbol(annos, pn_bytes(strlen("my-map-key1"), "my-map-key1"));
+    pn_data_put_char(annos, 'X');
+    pn_data_put_symbol(annos, pn_bytes(strlen("my-map-key2"), "my-map-key2"));
+    pn_data_put_byte(annos, 0x12);
+    pn_data_put_symbol(annos, pn_bytes(strlen("my-map-key3"), "my-map-key3"));
+    pn_data_put_string(annos, pn_bytes(strlen("Are We Not Men?"), "Are We Not Men?"));
+    pn_data_put_symbol(annos, pn_bytes(strlen("my-last-key"), "my-last-key"));
+    pn_data_put_binary(annos, pn_bytes(sizeof(big_string), big_string));
+    pn_data_exit(annos);
+
+    pn_data_put_symbol(annos, pn_bytes(strlen("my-ulong"), "my-ulong"));
+    pn_data_put_ulong(annos, 0xDEADBEEFCAFEBEEF);
+
+    // embedded list
+    pn_data_put_symbol(annos, pn_bytes(strlen("my-list"), "my-list"));
+    pn_data_put_list(annos);
+    pn_data_enter(annos);
+    pn_data_put_string(annos, pn_bytes(sizeof(big_string), big_string));
+    pn_data_put_double(annos, 3.1415);
+    pn_data_put_short(annos, 1966);
+    pn_data_exit(annos);
+
+    pn_data_put_symbol(annos, pn_bytes(strlen("my-bool"), "my-bool"));
+    pn_data_put_bool(annos, false);
+
+    pn_data_exit(annos);
+}
+
+
+void generate_message(void)
+{
+    if (!out_message) {
+        out_message = pn_message();
+    }
+
+    if (use_anonymous) {
+        pn_message_set_address(out_message, target_address);
+    }
+
+    pn_data_t *body = pn_message_body(out_message);
+    pn_data_clear(body);
+    body_data.size = body_size;
+    pn_data_put_binary(body, body_data);
+
+    if (add_annotations) {
+        add_message_annotations(out_message);
+    }
+
+    // now encode it
+
+    pn_data_rewind(pn_message_body(out_message));
+    if (!encode_buffer) {
+        encode_buffer_size = body_size + 512;
+        encode_buffer = malloc(encode_buffer_size);
+    }
+
+    int rc = 0;
+    size_t len = encode_buffer_size;
+    do {
+        rc = pn_message_encode(out_message, encode_buffer, &len);
+        if (rc == PN_OVERFLOW) {
+            free(encode_buffer);
+            encode_buffer_size *= 2;
+            encode_buffer = malloc(encode_buffer_size);
+            len = encode_buffer_size;
+        }
+    } while (rc == PN_OVERFLOW);
+
+    if (rc) {
+        perror("buffer encode failed");
+        exit(-1);
+    }
+
+    encoded_data_size = len;
+}
+
+
+static void signal_handler(int signum)
+{
+    signal(SIGINT,  SIG_IGN);
+    signal(SIGQUIT, SIG_IGN);
+
+    switch (signum) {
+    case SIGINT:
+    case SIGQUIT:
+        stop = true;
+        break;
+    default:
+        break;
+    }
+}
+
+
+static void delete_handler(pn_handler_t *handler)
+{
+    free(encode_buffer);
+    pn_message_free(out_message);
+}
+
+
+/* Process each event posted by the reactor.
+ */
+static void event_handler(pn_handler_t *handler,
+                          pn_event_t *event,
+                          pn_event_type_t type)
+{
+    switch (type) {
+
+    case PN_CONNECTION_INIT: {
+        // Create and open all the endpoints needed to send a message
+        //
+        pn_connection_open(pn_conn);
+        pn_session_t *pn_ssn = pn_session(pn_conn);
+        pn_session_open(pn_ssn);
+        pn_link_t *pn_link = pn_sender(pn_ssn, "MySender");
+        if (!use_anonymous) {
+            pn_terminus_set_address(pn_link_target(pn_link), target_address);
+        }
+        pn_link_open(pn_link);
+
+        acked = count;
+        generate_message();
+
+    } break;
+
+    case PN_LINK_FLOW: {
+        // the remote has given us some credit, now we can send messages
+        //
+        static long tag = 0;  // a simple tag generator
+        pn_link_t *sender = pn_event_link(event);
+        int credit = pn_link_credit(sender);
+
+        while (!stop && credit > 0 && (limit == 0 || count < limit)) {
+            --credit;
+            ++count;
+            ++tag;
+            pn_delivery_t *delivery = pn_delivery(sender, pn_dtag((const char *)&tag, sizeof(tag)));
+            pn_link_send(sender, encode_buffer, encoded_data_size);
+            pn_link_advance(sender);
+            if (presettle) {
+                pn_delivery_settle(delivery);
+                // fake terminal outcome
+                ++acked;
+                ++accepted;
+                if (limit && count == limit) {
+                    // no need to wait for acks
+                    stop = true;
+                    pn_reactor_wakeup(reactor);
+                }
+            }
+        }
+    } break;
+
+    case PN_DELIVERY: {
+        pn_delivery_t *dlv = pn_event_delivery(event);
+        if (pn_delivery_updated(dlv)) {
+            uint64_t rs = pn_delivery_remote_state(dlv);
+            switch (rs) {
+            case PN_RECEIVED:
+                // This is not a terminal state - it is informational, and the
+                // peer is still processing the message.
+                break;
+            case PN_ACCEPTED:
+                ++acked;
+                ++accepted;
+                pn_delivery_settle(dlv);
+                break;
+            case PN_REJECTED:
+                ++acked;
+                ++rejected;
+                pn_delivery_settle(dlv);
+                break;
+            case PN_RELEASED:
+                ++acked;
+                ++released;
+                pn_delivery_settle(dlv);
+                break;
+            case PN_MODIFIED:
+                ++acked;
+                ++modified;
+                pn_delivery_settle(dlv);
+                break;
+
+            default:
+                break;
+            }
+
+            if (limit && acked == limit) {
+                // initiate clean shutdown of the endpoints
+                stop = true;
+                pn_reactor_wakeup(reactor);
+            }
+        }
+    } break;
+
+    default:
+        break;
+    }
+}
+
+static void usage(void)
+{
+  printf("Usage: sender <options>\n");
+  printf("-a      \tThe address:port of the server [%s]\n", host_address);
+  printf("-c      \t# of messages to send, 0 == nonstop [%"PRIu64"]\n", limit);
+  printf("-i      \tContainer name [%s]\n", container_name);
+  printf("-n      \tUse an anonymous link [%s]\n", BOOL2STR(use_anonymous));
+  printf("-s      \tBody size in bytes ('s'=%d 'm'=%d 'l'=%d) [%d]\n",
+         BODY_SIZE_SMALL, BODY_SIZE_MEDIUM, BODY_SIZE_LARGE, body_size);
+  printf("-t      \tTarget address [%s]\n", target_address);
+  printf("-u      \tSend all messages presettled [%s]\n", BOOL2STR(presettle));
+  printf("-M      \tAdd dummy Message Annotations section [off]\n");
+  printf("-E      \tExit without cleanly closing the connection [off]\n");
+  exit(1);
+}
+
+int main(int argc, char** argv)
+{
+    /* command line options */
+    opterr = 0;
+    int c;
+    while ((c = getopt(argc, argv, "ha:c:i:ns:t:uME")) != -1) {
+        switch(c) {
+        case 'h': usage(); break;
+        case 'a': host_address = optarg; break;
+        case 'c':
+            if (sscanf(optarg, "%"PRIu64, &limit) != 1)
+                usage();
+            break;
+        case 'i': container_name = optarg; break;
+        case 'n': use_anonymous = true; break;
+        case 's':
+            switch (optarg[0]) {
+            case 's': body_size = BODY_SIZE_SMALL; break;
+            case 'm': body_size = BODY_SIZE_MEDIUM; break;
+            case 'l': body_size = BODY_SIZE_LARGE; break;
+            default:
+                usage();
+            }
+            break;
+        case 't': target_address = optarg; break;
+        case 'u': presettle = true;        break;
+        case 'M': add_annotations = true;  break;
+        case 'E': drop_connection = true;  break;
+
+        default:
+            usage();
+            break;
+        }
+    }
+
+    signal(SIGQUIT, signal_handler);
+    signal(SIGINT,  signal_handler);
+
+    char *host = host_address;
+    if (strncmp(host, "amqp://", 7) == 0)
+        host += 7;
+    char *port = strrchr(host, ':');
+    if (port) {
+        *port++ = 0;
+    } else {
+        port = "5672";
+    }
+
+    pn_handler_t *handler = pn_handler_new(event_handler, 0, delete_handler);
+    pn_handler_add(handler, pn_handshaker());
+
+    reactor = pn_reactor();
+    pn_conn = pn_reactor_connection_to_host(reactor,
+                                            host,
+                                            port,
+                                            handler);
+
+    // the container name should be unique for each client
+    pn_connection_set_container(pn_conn, container_name);
+    pn_connection_set_hostname(pn_conn, host);
+
+    // break out of pn_reactor_process once a second to check if done
+    pn_reactor_set_timeout(reactor, 1000);
+
+    pn_reactor_start(reactor);
+
+    time_t last_log = 0;
+    while (pn_reactor_process(reactor)) {
+        if (stop) {
+            if (drop_connection) {  // hard stop
+                fprintf(stdout,
+                        "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
+                        " Released:%"PRIu64" Modified:%"PRIu64"\n",
+                        count, accepted, rejected, released, modified);
+                exit(0);
+            }
+
+            // wait (forever) until all sent messages are confirmed by the
+            // receiver
+
+            if (count == acked) {
+                // close the endpoints this will cause pn_reactor_process() to
+                // eventually break the loop
+                if (pn_link) pn_link_close(pn_link);
+                if (pn_ssn) pn_session_close(pn_ssn);
+                if (pn_conn) pn_connection_close(pn_conn);
+            } else {
+                // periodically give status for test output logs
+                time_t now = time(NULL);
+                if ((now - last_log) >= 1) {
+                    fprintf(stdout,
+                            "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
+                            " Released:%"PRIu64" Modified:%"PRIu64"\n",
+                            count, accepted, rejected, released, modified);
+                    last_log = now;
+                }
+            }
+        }
+    }
+
+    if (pn_link) pn_link_free(pn_link);
+    if (pn_ssn) pn_session_free(pn_ssn);
+    if (pn_conn) pn_connection_close(pn_conn);
+
+    pn_reactor_free(reactor);
+
+    return 0;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org