You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2020/10/27 19:07:27 UTC
[qpid-dispatch] 02/02: DISPATCH-1807: Add TCP protocol adaptor tests
This is an automated email from the ASF dual-hosted git repository.
chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 2d074ac41800f18a9549f97b82e49911027607ec
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Tue Oct 27 15:04:06 2020 -0400
DISPATCH-1807: Add TCP protocol adaptor tests
Rewrite do-nothing tcp_adaptor test
* Mistakenly committed test cmake including it before it's ready
* It's still not ready but is finding some form
Improve echo server
* For messages larger than 100 bytes only print the first and
last 50 bytes to logs.
Improve echo client
* Send unique data for each message
* Improve logging
* Add timeout
* Don't use socket after closing it
---
tests/TCP_echo_client.py | 102 ++++++++++++++++++++++++---
tests/TCP_echo_server.py | 29 ++++++--
tests/system_tests_tcp_adaptor.py | 145 ++++++++++++++++++++++++++++++++++++++
3 files changed, 262 insertions(+), 14 deletions(-)
diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py
index 83f1b44..5f2a687 100755
--- a/tests/TCP_echo_client.py
+++ b/tests/TCP_echo_client.py
@@ -24,12 +24,38 @@ import os
import selectors
import socket
import sys
+import time
import traceback
import types
from system_test import Logger
-def main_except(host, port, size, count, logger):
+
+class EchoLogger(Logger):
+ def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False):
+ self.prefix = prefix + ' ' if len(prefix) > 0 else ''
+ super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump)
+
+ def log(self, msg):
+ super(EchoLogger, self).log(self.prefix + msg)
+
+
+def split_chunk_for_display(raw_bytes):
+ """
+ Given some raw bytes, return a display string
+ Only show the beginning and end of largish (2xMAGIC_SIZE) arrays.
+ :param raw_bytes:
+ :return: display string
+ """
+ MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo client, too
+ if len(raw_bytes) > 2 * MAGIC_SIZE:
+ result = repr(raw_bytes[:MAGIC_SIZE]) + " ... " + repr(raw_bytes[-MAGIC_SIZE:])
+ else:
+ result = repr(raw_bytes)
+ return result
+
+
+def main_except(host, port, size, count, timeout, logger):
'''
:param host: connect to this host
:param port: connect to this port
@@ -41,16 +67,36 @@ def main_except(host, port, size, count, logger):
:return:
'''
# Start up
+ start_time = time.time()
logger.log('Connecting to host:%s, port:%d, size:%d, count:%d' % (host, port, size, count))
keep_going = True
+ total_sent = 0
+ total_rcvd = 0
# outbound payload
payload_out = []
out_list_idx = 0 # current _out array being sent
out_byte_idx = 0 # next-to-send in current array
out_ready_to_send = True
- for i in range(count):
- payload_out.append(bytearray([i & 255] * size))
+ # Generate unique content for each message so you can tell where the message
+ # or fragment belongs in the whole stream. Chunks look like:
+ # b'[localhost:33333:6:0]ggggggggggggggggggggggggggggg'
+ # host: localhost
+ # port: 33333
+ # index: 6
+ # offset into message: 0
+ MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo server, too
+ for idx in range(count):
+ body_msg = ""
+ padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30]
+ while len(body_msg) < size:
+ chunk = "[%s:%d:%d:%d]" % (host, port, idx, len(body_msg))
+ padlen = MAGIC_SIZE - len(chunk)
+ chunk += padchar * padlen
+ body_msg += chunk
+ if len(body_msg) > size:
+ body_msg = body_msg[:size]
+ payload_out.append(bytearray(body_msg.encode()))
# incoming payloads
payload_in = []
in_list_idx = 0 # current _in array being received
@@ -70,11 +116,17 @@ def main_except(host, port, size, count, logger):
# event loop
while keep_going:
- for key, mask in sel.select(timeout=1):
+ if timeout > 0.0:
+ elapsed = time.time() - start_time
+ if elapsed > timeout:
+ logger.log("Exiting due to timeout. Total sent= %d, total rcvd= %d" % (total_sent, total_rcvd))
+ break
+ for key, mask in sel.select(timeout=0.1):
sock = key.fileobj
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024)
if recv_data:
+ total_rcvd = len(recv_data)
payload_in[in_list_idx].extend(recv_data)
if len(payload_in[in_list_idx]) == size:
logger.log("Rcvd message %d" % in_list_idx)
@@ -82,6 +134,16 @@ def main_except(host, port, size, count, logger):
if in_list_idx == count:
# Received all bytes of all chunks - done.
keep_going = False
+ # Verify the received data
+ for idxc in range(count):
+ for idxs in range(size):
+ ob = payload_out[idxc][idxs]
+ ib = payload_in [idxc][idxs]
+ if ob != ib:
+ error = "CRITICAL Rcvd message verify fail. row:%d, col:%d, expected:%s, actual:%s" \
+ % (idxc, idxs, repr(ob), repr(ib))
+ logger.log(error)
+ raise Exception(error)
else:
out_ready_to_send = True
sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE)
@@ -89,7 +151,7 @@ def main_except(host, port, size, count, logger):
error = "CRITICAL Rcvd message too big. Expected:%d, actual:%d" % \
(size, len(payload_in[in_list_idx]))
logger.log(error)
- keep_going = False
+ raise Exception(error)
else:
pass # still accumulating a message
else:
@@ -98,6 +160,7 @@ def main_except(host, port, size, count, logger):
if mask & selectors.EVENT_WRITE:
if out_ready_to_send:
n_sent = sock.send( payload_out[out_list_idx][out_byte_idx:] )
+ total_sent += n_sent
out_byte_idx += n_sent
if out_byte_idx == size:
logger.log("Sent message %d" % out_list_idx)
@@ -124,6 +187,10 @@ def main(argv):
help='Size of payload in bytes')
p.add_argument('--count', '-c', type=int, default=1, const=1, nargs='?',
help='Number of payloads to process')
+ p.add_argument('--name',
+ help='Optional logger prefix')
+ p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?",
+ help='Timeout in seconds. Default value "0" disables timeouts')
p.add_argument('--log', '-l',
action='store_true',
help='Write activity log to console')
@@ -141,18 +208,35 @@ def main(argv):
port = args.port
# size
+ if args.size <= 0:
+ raise Exception("Size must be greater than zero")
size = args.size
# count
+ if args.count <= 0:
+ raise Exception("Count must be greater than zero")
count = args.count
+ # name / prefix
+ prefix = args.name if args.name is not None else "ECHO_CLIENT"
+
+ # timeout
+ if args.timeout < 0.0:
+ raise Exception("Timeout must be greater than or equal to zero")
+
# logging
- logger = Logger(title = "TCP_echo_client host:%s port %d size:%d count:%d" % (host, port, size, count),
- print_to_console = args.log,
- save_for_dump = False)
+ logger = EchoLogger(prefix=prefix,
+ title = "%s host:%s port %d size:%d count:%d" % (prefix, host, port, size, count),
+ print_to_console = args.log,
+ save_for_dump = False)
- main_except(host, port, size, count, logger)
+ main_except(host, port, size, count, args.timeout, logger)
return 0
+
+ except KeyboardInterrupt:
+ logger.log("Exiting due to KeyboardInterrupt")
+ return 0
+
except Exception as e:
traceback.print_exc()
return 1
diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py
index 44aefd7..28fc2ae 100755
--- a/tests/TCP_echo_server.py
+++ b/tests/TCP_echo_server.py
@@ -63,6 +63,21 @@ class EchoLogger(Logger):
super(EchoLogger, self).log(self.prefix + msg)
+def split_chunk_for_display(raw_bytes):
+ """
+ Given some raw bytes, return a display string
+ Only show the beginning and end of largish (2xMAGIC_SIZE) arrays.
+ :param raw_bytes:
+ :return: display string
+ """
+ MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo client, too
+ if len(raw_bytes) > 2 * MAGIC_SIZE:
+ result = repr(raw_bytes[:MAGIC_SIZE]) + " ... " + repr(raw_bytes[-MAGIC_SIZE:])
+ else:
+ result = repr(raw_bytes)
+ return result
+
+
def main_except(sock, port, echo_count, timeout, logger):
'''
:param lsock: socket to listen on
@@ -128,23 +143,26 @@ def do_service(key, mask, sel, logger):
recv_data = sock.recv(1024)
if recv_data:
data.outb += recv_data
- logger.log('read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data), repr(recv_data)))
+ logger.log('read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data),
+ split_chunk_for_display(recv_data)))
sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)
else:
logger.log('Closing connection to %s:%d' % (data.addr[0], data.addr[1]))
sel.unregister(sock)
sock.close()
+ return 0
if mask & selectors.EVENT_WRITE:
if data.outb:
sent = sock.send(data.outb)
retval += sent
if sent > 0:
- logger.log('write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent, repr(data.outb[:sent])))
+ logger.log('write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent,
+ split_chunk_for_display(data.outb[:sent])))
else:
logger.log('write to : %s:%d len:0' % (data.addr[0], data.addr[1]))
data.outb = data.outb[sent:]
else:
- logger.log('write event with no data' + str(data))
+ #logger.log('write event with no data' + str(data))
sel.modify(sock, selectors.EVENT_READ, data=data)
return retval
@@ -183,7 +201,7 @@ def main(argv):
# timeout
if args.timeout < 0.0:
- raise Exception("Timeout must be greater than zero")
+ raise Exception("Timeout must be greater than or equal to zero")
# logging
logger = EchoLogger(prefix = prefix,
@@ -197,9 +215,10 @@ def main(argv):
main_except(lsock, port, args.echo, args.timeout, logger)
except KeyboardInterrupt:
- pass
+ logger.log("Exiting due to KeyboardInterrupt. Total echoed = %d" % total_echoed)
except Exception as e:
+ logger.log("Exiting due to Exception. Total echoed = %d" % total_echoed)
traceback.print_exc()
retval = 1
diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py
new file mode 100644
index 0000000..8120937
--- /dev/null
+++ b/tests/system_tests_tcp_adaptor.py
@@ -0,0 +1,145 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+import os
+from time import sleep
+from threading import Event
+from threading import Timer
+
+from proton import Message, Timeout
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy, TestTimeout, PollTimeout
+from system_test import AsyncTestReceiver
+from system_test import AsyncTestSender
+from system_test import Logger
+from system_test import QdManager
+from system_test import unittest
+from system_test import Process
+from system_tests_link_routes import ConnLinkRouteService
+from test_broker import FakeBroker
+from test_broker import FakeService
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties
+from proton.utils import BlockingConnection
+from qpid_dispatch.management.client import Node
+from qpid_dispatch_internal.tools.command import version_supports_mutually_exclusive_arguments
+from subprocess import PIPE, STDOUT
+import re
+
+class AddrTimer(object):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ self.parent.check_address()
+
+
+class TcpAdaptorOneRouterEcho(TestCase):
+ """
+ Run echo tests through a stand-alone router
+ """
+ amqp_listener_port = None
+ tcp_client_listener_port = None
+ tcp_server_listener_port = None
+
+ @classmethod
+ def setUpClass(cls):
+ """Start a router and echo server"""
+ super(TcpAdaptorOneRouterEcho, cls).setUpClass()
+
+ def router(name, mode, l_amqp, l_tcp_client, l_tcp_server, addr, site, extra=None):
+ config = [
+ ('router', {'mode': mode, 'id': name}),
+ ('listener', {'port': l_amqp, 'stripAnnotations': 'no'}),
+ ('tcpConnector', {"host": "127.0.0.1",
+ "port": l_tcp_server,
+ "address": addr,
+ "siteId": site}),
+ ('tcpListener', {"host": "0.0.0.0",
+ "port": l_tcp_client,
+ "address": addr,
+ "siteId": site})
+ ]
+
+ if extra:
+ config.append(extra)
+ config = Qdrouterd.Config(config)
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+ cls.routers = []
+
+ cls.amqp_listener_port = cls.tester.get_port()
+ cls.tcp_client_listener_port = cls.tester.get_port()
+ cls.tcp_server_listener_port = cls.tester.get_port()
+
+ router('INT.A', 'interior', cls.amqp_listener_port, cls.tcp_client_listener_port,
+ cls.tcp_server_listener_port, "some_address", "best_site")
+
+ cls.logger = Logger(title="TCP echo one router", print_to_console=True)
+
+ @classmethod
+ def tearDownClass(cls):
+ pass
+
+ def spawn_echo_server(self, port, expect=None):
+ cmd = ["TCP_echo_server.py",
+ "--port", str(port),
+ "--log"]
+ return self.popen(cmd, name='echo-server', stdout=PIPE, expect=expect,
+ universal_newlines=True)
+
+ def spawn_echo_client(self, logger, host, port, size, count, expect=None):
+ if expect is None:
+ expect = Process.EXIT_OK
+ cmd = ["TCP_echo_client.py",
+ "--host", host,
+ "--port", str(port),
+ "--size", str(size),
+ "--count", str(count),
+ "--log"]
+ logger.log("Start client. cmd=%s" % str(cmd))
+ return self.popen(cmd, name='echo-clint', stdout=PIPE, expect=expect,
+ universal_newlines=True)
+
+ def do_test_echo(self, logger, host, port, size, count):
+ # start echo client
+ client = self.spawn_echo_client(logger, host, port, size, count)
+ cl_text, cl_error = client.communicate(timeout=TIMEOUT)
+ if client.returncode:
+ raise Exception("Echo client failed size:%d, count:%d : %s %s" %
+ (size, count, cl_text, cl_error))
+
+ def test_01_tcp_echo_one_router(self):
+ # start echo server
+ #server = self.spawn_echo_server(self.tcp_server_listener_port)
+
+ #for size in [1, 5, 10, 50, 100]:
+ # for count in [1, 5, 10, 50, 100]:
+ # self.logger.log("Starting echo client host:localhost, port:%d, size:%d, count:%d" %
+ # (self.tcp_client_listener_port, size, count))
+ # self.do_test_echo(self.logger, "localhost", self.tcp_client_listener_port, size, count)
+ #server.join()
+ pass
+
+if __name__== '__main__':
+ unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org