You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/09/18 15:27:43 UTC
[qpid-dispatch] branch master updated: DISPATCH-1406: Add system
test for the DISPATCH-1406 issue.
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 759dfd6 DISPATCH-1406: Add system test for the DISPATCH-1406 issue.
759dfd6 is described below
commit 759dfd6be57c2d32a371b1529194831356de61c5
Author: Kenneth Giusti <kg...@redhat.com>
AuthorDate: Fri Sep 6 13:54:42 2019 -0400
DISPATCH-1406: Add system test for the DISPATCH-1406 issue.
This closes #564
---
tests/CMakeLists.txt | 8 +
tests/system_test.py | 104 ++++++++-
tests/system_tests_edge_router.py | 2 +-
tests/system_tests_link_routes.py | 4 +-
tests/system_tests_router_mesh.py | 215 ++++++++++++++++++
tests/test-receiver.c | 233 ++++++++++++++++++++
tests/test-sender.c | 450 ++++++++++++++++++++++++++++++++++++++
7 files changed, 1001 insertions(+), 15 deletions(-)
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 2fcff52..7b2d03b 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -52,6 +52,13 @@ set(unit_test_size_SOURCES
add_executable(unit_tests_size ${unit_test_size_SOURCES})
target_link_libraries(unit_tests_size qpid-dispatch)
+add_executable(test-sender test-sender.c)
+target_link_libraries(test-sender ${Proton_LIBRARIES})
+
+add_executable(test-receiver test-receiver.c)
+target_link_libraries(test-receiver ${Proton_LIBRARIES})
+
+
set(TEST_WRAP ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/run.py)
add_test(unit_tests_size_10000 ${TEST_WRAP} unit_tests_size 10000)
@@ -127,6 +134,7 @@ foreach(py_test_module
system_tests_multi_phase
system_tests_multicast
system_tests_fallback_dest
+ system_tests_router_mesh
)
add_test(${py_test_module} ${TEST_WRAP} unit2 -v ${py_test_module})
diff --git a/tests/system_test.py b/tests/system_test.py
index f099560..366f63f 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -47,12 +47,15 @@ import uuid
import proton
from proton import Message
+from proton import Delivery
from proton.handlers import MessagingHandler
from proton.utils import BlockingConnection
from proton.reactor import AtLeastOnce, Container
from qpid_dispatch.management.client import Node
from qpid_dispatch_internal.compat import dict_iteritems
+is_python2 = sys.version_info[0] == 2
+
# Optional modules
MISSING_MODULES = []
@@ -268,6 +271,39 @@ class Process(subprocess.Popen):
if self.expect != None and self.expect != status:
error("exit code %s, expected %s" % (status, self.expect))
+ def wait(self, timeout=None):
+ """
+ Add support for a timeout when using Python 2
+ """
+ if timeout is None:
+ return super(Process, self).wait()
+
+ if is_python2:
+ start = time.time()
+ while True:
+ rc = super(Process, self).poll()
+ if rc is not None:
+ return rc
+ if time.time() - start >= timeout:
+ raise Exception("Process did not terminate")
+ time.sleep(0.1)
+ else:
+ return super(Process, self).wait(timeout=timeout)
+
+ def communicate(self, input=None, timeout=None):
+ """
+ Add support for a timeout when using Python 2
+ """
+ if timeout is None:
+ return super(Process, self).communicate(input=input)
+
+ if is_python2:
+ self.wait(timeout=timeout)
+ return super(Process, self).communicate(input=input)
+
+ return super(Process, self).communicate(input=input,
+ timeout=timeout)
+
class Config(object):
"""Base class for configuration objects that provide a convenient
@@ -798,13 +834,24 @@ class AsyncTestSender(MessagingHandler):
A simple sender that runs in the background and sends 'count' messages to a
given target.
"""
- def __init__(self, address, target, count=1, body=None, container_id=None):
- super(AsyncTestSender, self).__init__()
+ class TestSenderException(Exception):
+ def __init__(self, error=None):
+ super(AsyncTestSender.TestSenderException, self).__init__(error)
+
+ def __init__(self, address, target, count=1, message=None, container_id=None):
+ super(AsyncTestSender, self).__init__(auto_accept=False,
+ auto_settle=False)
self.address = address
self.target = target
- self.count = count
- self._unaccepted = count
- self._body = body or "test"
+ self.total = count
+ self.accepted = 0
+ self.released = 0
+ self.modified = 0
+ self.rejected = 0
+ self.sent = 0
+ self.error = None
+
+ self._message = message or Message(body="test")
self._container = Container(self)
cid = container_id or "ATS-%s:%s" % (target, uuid.uuid4())
self._container.container_id = cid
@@ -821,6 +868,8 @@ class AsyncTestSender(MessagingHandler):
# don't stop it - wait until everything is sent
self._thread.join(timeout=TIMEOUT)
assert not self._thread.is_alive(), "sender did not complete"
+ if self.error:
+ raise AsyncTestSender.TestSenderException(self.error)
def on_start(self, event):
self._conn = self._container.connect(self.address)
@@ -831,16 +880,47 @@ class AsyncTestSender(MessagingHandler):
options=AtLeastOnce())
def on_sendable(self, event):
- if self.count:
- self._sender.send(Message(body=self._body))
- self.count -= 1
-
- def on_accepted(self, event):
- self._unaccepted -= 1;
- if self._unaccepted == 0:
+ if self.sent < self.total:
+ self._sender.send(self._message)
+ self.sent += 1
+
+ def _check_if_done(self):
+ done = (self.sent == self.total
+ and (self.accepted + self.released + self.modified
+ + self.rejected) == self.sent)
+ if done:
self._conn.close()
self._conn = None
+ def on_accepted(self, event):
+ self.accepted += 1;
+ event.delivery.settle()
+ self._check_if_done()
+
+ def on_released(self, event):
+ # for some reason Proton 'helpfully' calls on_released even though the
+ # delivery state is actually MODIFIED
+ if event.delivery.remote_state == Delivery.MODIFIED:
+ return self.on_modified(event)
+ self.released += 1
+ event.delivery.settle()
+ self._check_if_done()
+
+ def on_modified(self, event):
+ self.modified += 1
+ event.delivery.settle()
+ self._check_if_done()
+
+ def on_rejected(self, event):
+ self.rejected += 1
+ event.delivery.settle()
+ self._check_if_done()
+
+ def on_link_error(self, event):
+ self.error = "link error:%s" % str(event.link.remote_condition)
+ self._conn.close()
+ self._conn = None
+
class QdManager(object):
"""
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index e98c6ef..ee89b02 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -1503,7 +1503,7 @@ class LinkRouteProxyTest(TestCase):
rx = AsyncTestReceiver(self.EB1.listener, 'CfgLinkRoute1/foo',
wait=False, recover_link=True)
tx = AsyncTestSender(self.EA1.listener, 'CfgLinkRoute1/foo',
- body="HEY HO LET'S GO!")
+ message=Message(body="HEY HO LET'S GO!"))
tx.wait()
msg = rx.queue.get(timeout=TIMEOUT)
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 1c59d9a..33658b4 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -1820,7 +1820,7 @@ class ConnectionLinkRouteTest(TestCase):
s = AsyncTestSender(self.QDR_A.addresses[0],
"flea.B",
container_id="flea.BSender",
- body="SENDING TO flea.B",
+ message=Message(body="SENDING TO flea.B"),
count=COUNT)
s.wait() # for sender to complete
for i in range(COUNT):
@@ -1836,7 +1836,7 @@ class ConnectionLinkRouteTest(TestCase):
s = AsyncTestSender(self.QDR_B.addresses[0],
"flea.A",
container_id="flea.ASender",
- body="SENDING TO flea.A",
+ message=Message(body="SENDING TO flea.A"),
count=COUNT)
s.wait()
for i in range(COUNT):
diff --git a/tests/system_tests_router_mesh.py b/tests/system_tests_router_mesh.py
new file mode 100644
index 0000000..fad0600
--- /dev/null
+++ b/tests/system_tests_router_mesh.py
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+import os
+import sys
+from time import sleep
+import unittest2 as unittest
+from signal import SIGINT
+from subprocess import PIPE
+
+from proton import Message
+
+from system_test import TestCase
+from system_test import Qdrouterd
+from system_test import QdManager
+from system_test import main_module
+from system_test import TIMEOUT
+from system_test import Process
+from system_test import AsyncTestSender
+
+
+class ThreeRouterTest(TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ """
+ Create a mesh of three routers. Reject any links or messages sent to
+ an unavailable address.
+ """
+ super(ThreeRouterTest, cls).setUpClass()
+
+ def router(name, extra_config):
+ config = [
+
+ ('router', {'id': name,
+ 'mode': 'interior',
+ "defaultDistribution": "unavailable"}),
+ ('listener', {'role': 'normal',
+ 'port': cls.tester.get_port(),
+ "linkCapacity": '100'}),
+
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+ ] + extra_config
+
+ config = Qdrouterd.Config(config)
+
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
+
+ cls.routers = []
+
+ inter_router_A = cls.tester.get_port()
+ inter_router_B = cls.tester.get_port()
+ inter_router_C = cls.tester.get_port()
+
+ router('RouterA',
+ [('listener', {'role': 'inter-router', 'port': inter_router_A}),
+ ('connector', {'role': 'inter-router', 'port': inter_router_B})])
+
+ router('RouterB',
+ [('listener', {'role': 'inter-router', 'port': inter_router_B}),
+ ('connector', {'role': 'inter-router', 'port': inter_router_C})])
+
+ router('RouterC',
+ [('listener', {'role': 'inter-router', 'port': inter_router_C}),
+ ('connector', {'role': 'inter-router', 'port': inter_router_A})])
+
+ cls.RouterA = cls.routers[0]
+ cls.RouterB = cls.routers[1]
+ cls.RouterC = cls.routers[2]
+
+ cls.RouterA.wait_router_connected('RouterB')
+ cls.RouterA.wait_router_connected('RouterC')
+ cls.RouterB.wait_router_connected('RouterA')
+ cls.RouterB.wait_router_connected('RouterC')
+ cls.RouterC.wait_router_connected('RouterA')
+ cls.RouterC.wait_router_connected('RouterB')
+
+ def server_address(self, router):
+ return router.addresses[0]
+
+ def server_port(self, router):
+ return router.ports[0] # first listener is for client connection
+
+ def server_host(self, router):
+ fam = router.ports_family
+ return router.get_host(fam.get(self.server_port(router),
+ "IPv4"))
+
+ def spawn_receiver(self, router, count, address, *extra_args):
+ cmd = ["test-receiver",
+ "-a", "%s:%s" % (self.server_host(router),
+ self.server_port(router)),
+ "-c", str(count), "-s", address] + list(extra_args)
+ # env = dict(os.environ, PN_TRACE_FRM="1")
+ # return self.popen(cmd, expect=Process.EXIT_OK, env=env)
+ return self.popen(cmd, expect=Process.EXIT_OK)
+
+ def spawn_sender(self, router, count, address, *extra_args):
+ cmd = ["test-sender",
+ "-a", "%s:%s" % (self.server_host(router),
+ self.server_port(router)),
+ "-c", str(count), "-t", address] + list(extra_args)
+ # env = dict(os.environ, PN_TRACE_FRM="1")
+ # return self.popen(cmd, expect=Process.EXIT_OK, env=env)
+ return self.popen(cmd, expect=Process.EXIT_OK)
+
+ def _rx_failover(self, extra_tx_args=None, extra_rx_args=None):
+ # Have a single sender transmit unsettled as fast as possible
+ # non-stop. Have a single receiver that consumes a small number of
+ # messages before failing over to a different router in the mesh
+ extra_tx = extra_tx_args or []
+ extra_rx = extra_rx_args or []
+ total = 100
+ router_index = 0
+ tx = self.spawn_sender(self.RouterC, 0, "balanced/foo", *extra_tx)
+ while total > 0:
+ rx = self.spawn_receiver(self.routers[router_index], 5,
+ "balanced/foo", *extra_rx)
+ if rx.wait(timeout=TIMEOUT):
+ raise Exception("Receiver failed to consume all messages")
+ total -= 5
+ router_index += 1
+ if router_index == len(self.routers):
+ router_index = 0
+ tx.send_signal(SIGINT)
+ out_text, out_err = tx.communicate(timeout=TIMEOUT)
+ if tx.returncode:
+ raise Exception("Sender failed: %s %s"
+ % (out_text, out_err))
+
+ def test_01_rx_failover_clean(self):
+ """
+ Runs the receiver failover test. In this test the receiver cleanly
+ shuts down the AMQP endpoint before failing over.
+ """
+ self._rx_failover()
+
+
+ def test_02_rx_failover_dirty(self):
+ """
+ Runs the receiver failover test. In this test the receiver abruptly
+ drops the TCP connection simulating a client crash.
+ """
+ tcp_drop = ["-E"]
+ self._rx_failover(extra_rx_args=tcp_drop)
+
+ def test_03_unavailable_link_attach(self):
+ """
+ Attempt to attach a link to an unavailable address, expect the router
+ to detach it
+ """
+ ats = AsyncTestSender(self.server_address(self.RouterA),
+ "an/unavailable/address")
+ try:
+ ats.wait()
+ self.assertTrue(False) # expect exception
+ except AsyncTestSender.TestSenderException as exc:
+ self.assertTrue("link error" in ats.error)
+
+ def test_04_unavailable_anonymous_link_attach(self):
+ """
+ Attempt to attach an anonymous link and send a message to an
+ unavailable address. Expect to allow the link, but REJECT the message
+ """
+ message = Message(body="REJECTED!!!")
+ message.address = "another/unavailable/address"
+ ats = AsyncTestSender(self.server_address(self.RouterA),
+ target=None,
+ message=message)
+ ats.wait()
+ self.assertEqual(0, ats.accepted)
+ self.assertEqual(1, ats.rejected)
+
+ def test_05_unavailable_anonymous_link_send(self):
+ """
+ Attach an anonymous link and send to a configured address (no
+ subscribers). Expect to allow the link, but RELEASE the message
+ """
+ message = Message(body="Release me, let me go...")
+ message.address = "closest/foo"
+ ats = AsyncTestSender(self.server_address(self.RouterA),
+ target=None,
+ message=message)
+ ats.wait()
+ self.assertEqual(0, ats.accepted)
+ # BUG DISPATCH-1418: shold be released!
+ # self.assertEqual(1, ats.released)
+ self.assertEqual(1, ats.rejected)
+
+
+if __name__ == '__main__':
+ unittest.main(main_module())
diff --git a/tests/test-receiver.c b/tests/test-receiver.c
new file mode 100644
index 0000000..8ae189c
--- /dev/null
+++ b/tests/test-receiver.c
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <time.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <math.h>
+
+#include "proton/reactor.h"
+#include "proton/message.h"
+#include "proton/connection.h"
+#include "proton/session.h"
+#include "proton/link.h"
+#include "proton/delivery.h"
+#include "proton/event.h"
+#include "proton/handlers.h"
+
+
+#define MAX_SIZE (1024 * 64)
+char in_buffer[MAX_SIZE];
+
+bool stop = false;
+
+int credit_window = 1000;
+char *source_address = "test-address"; // name of the source node to receive from
+char _addr[] = "127.0.0.1:5672";
+char *host_address = _addr;
+char *container_name = "TestReceiver";
+bool drop_connection = false;
+
+pn_connection_t *pn_conn;
+pn_session_t *pn_ssn;
+pn_link_t *pn_link;
+pn_reactor_t *reactor;
+pn_message_t *in_message; // holds the current received message
+
+uint64_t count = 0;
+uint64_t limit = 0; // if > 0 stop after limit messages arrive
+
+
+static void signal_handler(int signum)
+{
+ signal(SIGINT, SIG_IGN);
+ signal(SIGQUIT, SIG_IGN);
+
+ switch (signum) {
+ case SIGINT:
+ case SIGQUIT:
+ stop = true;
+ break;
+ default:
+ break;
+ }
+}
+
+
+// Called when reactor exits to clean up app_data
+//
+static void delete_handler(pn_handler_t *handler)
+{
+ if (in_message) {
+ pn_message_free(in_message);
+ in_message = NULL;
+ }
+}
+
+
+/* Process each event posted by the reactor.
+ */
+static void event_handler(pn_handler_t *handler,
+ pn_event_t *event,
+ pn_event_type_t type)
+{
+ switch (type) {
+
+ case PN_CONNECTION_INIT: {
+ // Create and open all the endpoints needed to send a message
+ //
+ in_message = pn_message();
+ pn_connection_open(pn_conn);
+ pn_ssn = pn_session(pn_conn);
+ pn_session_open(pn_ssn);
+ pn_link = pn_receiver(pn_ssn, "MyReceiver");
+ pn_terminus_set_address(pn_link_source(pn_link), source_address);
+ pn_link_open(pn_link);
+ // cannot receive without granting credit:
+ pn_link_flow(pn_link, credit_window);
+ } break;
+
+ case PN_DELIVERY: {
+ // A message has been received
+ //
+ if (stop) break; // silently discard any further messages
+
+ pn_delivery_t *dlv = pn_event_delivery(event);
+ if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
+ // A full message has arrived
+ count += 1;
+ pn_delivery_update(dlv, PN_ACCEPTED);
+ pn_delivery_settle(dlv); // dlv is now freed
+
+ if (pn_link_credit(pn_link) <= credit_window/2) {
+ // Grant enough credit to bring it up to CAPACITY:
+ pn_link_flow(pn_link, credit_window - pn_link_credit(pn_link));
+ }
+
+ if (limit && count == limit) {
+ stop = true;
+ pn_reactor_wakeup(reactor);
+ }
+ }
+ } break;
+
+ default:
+ break;
+ }
+}
+
+static void usage(void)
+{
+ printf("Usage: receiver <options>\n");
+ printf("-a \tThe address:port of the server [%s]\n", host_address);
+ printf("-c \tExit after N messages arrive (0 == run forever) [%"PRIu64"]\n", limit);
+ printf("-i \tContainer name [%s]\n", container_name);
+ printf("-s \tSource address [%s]\n", source_address);
+ printf("-w \tCredit window [%d]\n", credit_window);
+ printf("-E \tExit without cleanly closing the connection [off]\n");
+ exit(1);
+}
+
+
+int main(int argc, char** argv)
+{
+ /* create a handler for the connection's events.
+ */
+ pn_handler_t *handler = pn_handler_new(event_handler, 0, delete_handler);
+ pn_handler_add(handler, pn_handshaker());
+
+ /* command line options */
+ opterr = 0;
+ int c;
+ while((c = getopt(argc, argv, "i:a:s:hw:c:E")) != -1) {
+ switch(c) {
+ case 'h': usage(); break;
+ case 'a': host_address = optarg; break;
+ case 'c':
+ if (sscanf(optarg, "%"PRIu64, &limit) != 1)
+ usage();
+ break;
+ case 'i': container_name = optarg; break;
+ case 's': source_address = optarg; break;
+ case 'w':
+ if (sscanf(optarg, "%d", &credit_window) != 1 || credit_window <= 0)
+ usage();
+ break;
+ case 'E': drop_connection = true; break;
+
+ default:
+ usage();
+ break;
+ }
+ }
+
+ signal(SIGQUIT, signal_handler);
+ signal(SIGINT, signal_handler);
+
+ char *host = host_address;
+ if (strncmp(host, "amqp://", 7) == 0)
+ host += 7;
+ char *port = strrchr(host, ':');
+ if (port) {
+ *port++ = 0;
+ } else {
+ port = "5672";
+ }
+
+ reactor = pn_reactor();
+ pn_conn = pn_reactor_connection_to_host(reactor,
+ host,
+ port,
+ handler);
+
+ // the container name should be unique for each client
+ pn_connection_set_container(pn_conn, container_name);
+ pn_connection_set_hostname(pn_conn, host);
+
+ // periodic wakeup
+ pn_reactor_set_timeout(reactor, 1000);
+
+ pn_reactor_start(reactor);
+
+ while (pn_reactor_process(reactor)) {
+ if (stop) {
+ if (drop_connection) // hard exit
+ exit(0);
+ // close the endpoints this will cause pn_reactor_process() to
+ // eventually break the loop
+ if (pn_link) pn_link_close(pn_link);
+ if (pn_ssn) pn_session_close(pn_ssn);
+ if (pn_conn) pn_connection_close(pn_conn);
+ }
+ }
+
+ if (pn_link) pn_link_free(pn_link);
+ if (pn_ssn) pn_session_free(pn_ssn);
+ if (pn_conn) pn_connection_close(pn_conn);
+
+ pn_reactor_free(reactor);
+
+ return 0;
+}
diff --git a/tests/test-sender.c b/tests/test-sender.c
new file mode 100644
index 0000000..92e182a
--- /dev/null
+++ b/tests/test-sender.c
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#define ADD_ANNOTATIONS 1
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <time.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <math.h>
+
+
+#include "proton/reactor.h"
+#include "proton/message.h"
+#include "proton/connection.h"
+#include "proton/session.h"
+#include "proton/link.h"
+#include "proton/delivery.h"
+#include "proton/event.h"
+#include "proton/handlers.h"
+
+#define BOOL2STR(b) ((b)?"true":"false")
+
+#define BODY_SIZE_SMALL 100
+#define BODY_SIZE_MEDIUM 2000
+#define BODY_SIZE_LARGE 60000 // NOTE: receiver.c max in buffer size = 64KB
+
+// body data - block of 0's
+//
+char _payload[BODY_SIZE_LARGE] = {0};
+pn_bytes_t body_data = {
+ .size = 0,
+ .start = _payload,
+};
+
+bool stop = false;
+
+uint64_t limit = 1; // # messages to send
+uint64_t count = 0; // # sent
+uint64_t acked = 0; // # of received acks
+
+// outcome counts
+uint64_t accepted = 0;
+uint64_t rejected = 0;
+uint64_t modified = 0;
+uint64_t released = 0;
+
+
+bool use_anonymous = false; // use anonymous link if true
+bool presettle = false; // true = send presettled
+bool add_annotations = false;
+int body_size = BODY_SIZE_SMALL;
+bool drop_connection = false;
+
+// buffer for encoded message
+char *encode_buffer = NULL;
+size_t encode_buffer_size = 0; // size of malloced memory
+size_t encoded_data_size = 0; // length of encoded content
+
+
+char *target_address = "test-address";
+char _addr[] = "127.0.0.1:5672";
+char *host_address = _addr;
+char *container_name = "TestSender";
+
+pn_connection_t *pn_conn;
+pn_session_t *pn_ssn;
+pn_link_t *pn_link;
+pn_reactor_t *reactor;
+pn_message_t *out_message;
+
+
+// odd-length long string
+const char big_string[] =
+ "+"
+ "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";
+
+
+static void add_message_annotations(pn_message_t *out_message)
+{
+ // just a bunch of dummy MA
+ pn_data_t *annos = pn_message_annotations(out_message);
+ pn_data_clear(annos);
+ pn_data_put_map(annos);
+ pn_data_enter(annos);
+
+ pn_data_put_symbol(annos, pn_bytes(strlen("my-key"), "my-key"));
+ pn_data_put_string(annos, pn_bytes(strlen("my-data"), "my-data"));
+
+ pn_data_put_symbol(annos, pn_bytes(strlen("my-other-key"), "my-other-key"));
+ pn_data_put_string(annos, pn_bytes(strlen("my-other-data"), "my-other-data"));
+
+ // embedded map
+ pn_data_put_symbol(annos, pn_bytes(strlen("my-map"), "my-map"));
+ pn_data_put_map(annos);
+ pn_data_enter(annos);
+ pn_data_put_symbol(annos, pn_bytes(strlen("my-map-key1"), "my-map-key1"));
+ pn_data_put_char(annos, 'X');
+ pn_data_put_symbol(annos, pn_bytes(strlen("my-map-key2"), "my-map-key2"));
+ pn_data_put_byte(annos, 0x12);
+ pn_data_put_symbol(annos, pn_bytes(strlen("my-map-key3"), "my-map-key3"));
+ pn_data_put_string(annos, pn_bytes(strlen("Are We Not Men?"), "Are We Not Men?"));
+ pn_data_put_symbol(annos, pn_bytes(strlen("my-last-key"), "my-last-key"));
+ pn_data_put_binary(annos, pn_bytes(sizeof(big_string), big_string));
+ pn_data_exit(annos);
+
+ pn_data_put_symbol(annos, pn_bytes(strlen("my-ulong"), "my-ulong"));
+ pn_data_put_ulong(annos, 0xDEADBEEFCAFEBEEF);
+
+ // embedded list
+ pn_data_put_symbol(annos, pn_bytes(strlen("my-list"), "my-list"));
+ pn_data_put_list(annos);
+ pn_data_enter(annos);
+ pn_data_put_string(annos, pn_bytes(sizeof(big_string), big_string));
+ pn_data_put_double(annos, 3.1415);
+ pn_data_put_short(annos, 1966);
+ pn_data_exit(annos);
+
+ pn_data_put_symbol(annos, pn_bytes(strlen("my-bool"), "my-bool"));
+ pn_data_put_bool(annos, false);
+
+ pn_data_exit(annos);
+}
+
+
+void generate_message(void)
+{
+ if (!out_message) {
+ out_message = pn_message();
+ }
+
+ if (use_anonymous) {
+ pn_message_set_address(out_message, target_address);
+ }
+
+ pn_data_t *body = pn_message_body(out_message);
+ pn_data_clear(body);
+ body_data.size = body_size;
+ pn_data_put_binary(body, body_data);
+
+ if (add_annotations) {
+ add_message_annotations(out_message);
+ }
+
+ // now encode it
+
+ pn_data_rewind(pn_message_body(out_message));
+ if (!encode_buffer) {
+ encode_buffer_size = body_size + 512;
+ encode_buffer = malloc(encode_buffer_size);
+ }
+
+ int rc = 0;
+ size_t len = encode_buffer_size;
+ do {
+ rc = pn_message_encode(out_message, encode_buffer, &len);
+ if (rc == PN_OVERFLOW) {
+ free(encode_buffer);
+ encode_buffer_size *= 2;
+ encode_buffer = malloc(encode_buffer_size);
+ len = encode_buffer_size;
+ }
+ } while (rc == PN_OVERFLOW);
+
+ if (rc) {
+ perror("buffer encode failed");
+ exit(-1);
+ }
+
+ encoded_data_size = len;
+}
+
+
+static void signal_handler(int signum)
+{
+ signal(SIGINT, SIG_IGN);
+ signal(SIGQUIT, SIG_IGN);
+
+ switch (signum) {
+ case SIGINT:
+ case SIGQUIT:
+ stop = true;
+ break;
+ default:
+ break;
+ }
+}
+
+
+static void delete_handler(pn_handler_t *handler)
+{
+ free(encode_buffer);
+ pn_message_free(out_message);
+}
+
+
+/* Process each event posted by the reactor.
+ */
+static void event_handler(pn_handler_t *handler,
+ pn_event_t *event,
+ pn_event_type_t type)
+{
+ switch (type) {
+
+ case PN_CONNECTION_INIT: {
+ // Create and open all the endpoints needed to send a message
+ //
+ pn_connection_open(pn_conn);
+ pn_session_t *pn_ssn = pn_session(pn_conn);
+ pn_session_open(pn_ssn);
+ pn_link_t *pn_link = pn_sender(pn_ssn, "MySender");
+ if (!use_anonymous) {
+ pn_terminus_set_address(pn_link_target(pn_link), target_address);
+ }
+ pn_link_open(pn_link);
+
+ acked = count;
+ generate_message();
+
+ } break;
+
+ case PN_LINK_FLOW: {
+ // the remote has given us some credit, now we can send messages
+ //
+ static long tag = 0; // a simple tag generator
+ pn_link_t *sender = pn_event_link(event);
+ int credit = pn_link_credit(sender);
+
+ while (!stop && credit > 0 && (limit == 0 || count < limit)) {
+ --credit;
+ ++count;
+ ++tag;
+ pn_delivery_t *delivery = pn_delivery(sender, pn_dtag((const char *)&tag, sizeof(tag)));
+ pn_link_send(sender, encode_buffer, encoded_data_size);
+ pn_link_advance(sender);
+ if (presettle) {
+ pn_delivery_settle(delivery);
+ // fake terminal outcome
+ ++acked;
+ ++accepted;
+ if (limit && count == limit) {
+ // no need to wait for acks
+ stop = true;
+ pn_reactor_wakeup(reactor);
+ }
+ }
+ }
+ } break;
+
+ case PN_DELIVERY: {
+ pn_delivery_t *dlv = pn_event_delivery(event);
+ if (pn_delivery_updated(dlv)) {
+ uint64_t rs = pn_delivery_remote_state(dlv);
+ switch (rs) {
+ case PN_RECEIVED:
+ // This is not a terminal state - it is informational, and the
+ // peer is still processing the message.
+ break;
+ case PN_ACCEPTED:
+ ++acked;
+ ++accepted;
+ pn_delivery_settle(dlv);
+ break;
+ case PN_REJECTED:
+ ++acked;
+ ++rejected;
+ pn_delivery_settle(dlv);
+ break;
+ case PN_RELEASED:
+ ++acked;
+ ++released;
+ pn_delivery_settle(dlv);
+ break;
+ case PN_MODIFIED:
+ ++acked;
+ ++modified;
+ pn_delivery_settle(dlv);
+ break;
+
+ default:
+ break;
+ }
+
+ if (limit && acked == limit) {
+ // initiate clean shutdown of the endpoints
+ stop = true;
+ pn_reactor_wakeup(reactor);
+ }
+ }
+ } break;
+
+ default:
+ break;
+ }
+}
+
+static void usage(void)
+{
+ printf("Usage: sender <options>\n");
+ printf("-a \tThe address:port of the server [%s]\n", host_address);
+ printf("-c \t# of messages to send, 0 == nonstop [%"PRIu64"]\n", limit);
+ printf("-i \tContainer name [%s]\n", container_name);
+ printf("-n \tUse an anonymous link [%s]\n", BOOL2STR(use_anonymous));
+ printf("-s \tBody size in bytes ('s'=%d 'm'=%d 'l'=%d) [%d]\n",
+ BODY_SIZE_SMALL, BODY_SIZE_MEDIUM, BODY_SIZE_LARGE, body_size);
+ printf("-t \tTarget address [%s]\n", target_address);
+ printf("-u \tSend all messages presettled [%s]\n", BOOL2STR(presettle));
+ printf("-M \tAdd dummy Message Annotations section [off]\n");
+ printf("-E \tExit without cleanly closing the connection [off]\n");
+ exit(1);
+}
+
+int main(int argc, char** argv)
+{
+ /* command line options */
+ opterr = 0;
+ int c;
+ while ((c = getopt(argc, argv, "ha:c:i:ns:t:uME")) != -1) {
+ switch(c) {
+ case 'h': usage(); break;
+ case 'a': host_address = optarg; break;
+ case 'c':
+ if (sscanf(optarg, "%"PRIu64, &limit) != 1)
+ usage();
+ break;
+ case 'i': container_name = optarg; break;
+ case 'n': use_anonymous = true; break;
+ case 's':
+ switch (optarg[0]) {
+ case 's': body_size = BODY_SIZE_SMALL; break;
+ case 'm': body_size = BODY_SIZE_MEDIUM; break;
+ case 'l': body_size = BODY_SIZE_LARGE; break;
+ default:
+ usage();
+ }
+ break;
+ case 't': target_address = optarg; break;
+ case 'u': presettle = true; break;
+ case 'M': add_annotations = true; break;
+ case 'E': drop_connection = true; break;
+
+ default:
+ usage();
+ break;
+ }
+ }
+
+ signal(SIGQUIT, signal_handler);
+ signal(SIGINT, signal_handler);
+
+ char *host = host_address;
+ if (strncmp(host, "amqp://", 7) == 0)
+ host += 7;
+ char *port = strrchr(host, ':');
+ if (port) {
+ *port++ = 0;
+ } else {
+ port = "5672";
+ }
+
+ pn_handler_t *handler = pn_handler_new(event_handler, 0, delete_handler);
+ pn_handler_add(handler, pn_handshaker());
+
+ reactor = pn_reactor();
+ pn_conn = pn_reactor_connection_to_host(reactor,
+ host,
+ port,
+ handler);
+
+ // the container name should be unique for each client
+ pn_connection_set_container(pn_conn, container_name);
+ pn_connection_set_hostname(pn_conn, host);
+
+ // break out of pn_reactor_process once a second to check if done
+ pn_reactor_set_timeout(reactor, 1000);
+
+ pn_reactor_start(reactor);
+
+ time_t last_log = 0;
+ while (pn_reactor_process(reactor)) {
+ if (stop) {
+ if (drop_connection) { // hard stop
+ fprintf(stdout,
+ "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
+ " Released:%"PRIu64" Modified:%"PRIu64"\n",
+ count, accepted, rejected, released, modified);
+ exit(0);
+ }
+
+ // wait (forever) until all sent messages are confirmed by the
+ // receiver
+
+ if (count == acked) {
+ // close the endpoints this will cause pn_reactor_process() to
+ // eventually break the loop
+ if (pn_link) pn_link_close(pn_link);
+ if (pn_ssn) pn_session_close(pn_ssn);
+ if (pn_conn) pn_connection_close(pn_conn);
+ } else {
+ // periodically give status for test output logs
+ time_t now = time(NULL);
+ if ((now - last_log) >= 1) {
+ fprintf(stdout,
+ "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
+ " Released:%"PRIu64" Modified:%"PRIu64"\n",
+ count, accepted, rejected, released, modified);
+ last_log = now;
+ }
+ }
+ }
+ }
+
+ if (pn_link) pn_link_free(pn_link);
+ if (pn_ssn) pn_session_free(pn_ssn);
+ if (pn_conn) pn_connection_close(pn_conn);
+
+ pn_reactor_free(reactor);
+
+ return 0;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org