You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2020/10/27 13:04:06 UTC
[qpid-dispatch] branch dev-protocol-adaptors-2 updated:
DISPATCH-1807: TCP self test - improve test server
This is an automated email from the ASF dual-hosted git repository.
chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
new 4aff90c DISPATCH-1807: TCP self test - improve test server
4aff90c is described below
commit 4aff90cbd9d31140da437ffdf2e3aca9a999e9a2
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Tue Oct 27 09:02:06 2020 -0400
DISPATCH-1807: TCP self test - improve test server
* Add timeout
* Add exit-on-byte-count
* Improve logging for merge with Scraper
---
tests/TCP_echo_server.py | 132 ++++++++++++++++++++++++++++++++++++++++-------
1 file changed, 112 insertions(+), 20 deletions(-)
diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py
index 61e95a2..bdfecfe 100755
--- a/tests/TCP_echo_server.py
+++ b/tests/TCP_echo_server.py
@@ -22,8 +22,10 @@
import argparse
import os
import selectors
+from signal import signal, SIGINT
import socket
import sys
+import time
import traceback
import types
@@ -31,14 +33,50 @@ from system_test import Logger
HOST = '127.0.0.1'
-def main_except(port, logger):
+class ClientRecord(object):
+ """
+ Object to register with the selector 'data' field
+ for incoming user connections. This is *not* used
+ for the listening socket.
+ This object holds the socketId in the address and
+ the inbound and outbound data list buffers for this
+ socket's payload.
+ """
+ def __init__(self, address):
+ self.addr = address
+ self.inb = b''
+ self.outb = b''
+
+ def __repr__(self):
+ return str(self.addr) + " len(in)=" + str(len(self.inb)) + " len(out)=" + str(len(self.outb))
+
+ def __str__(self):
+ return self.__repr__()
+
+
+class EchoLogger(Logger):
+ def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False):
+ self.prefix = prefix + ' ' if len(prefix) > 0 else ''
+ super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump)
+
+ def log(self, msg):
+ super(EchoLogger, self).log(self.prefix + msg)
+
+
+def main_except(sock, port, echo_count, timeout, logger):
'''
+ :param lsock: socket to listen on
:param port: port to listen on
+ :param echo_count: exit after echoing this many bytes
+ :param timeout: exit after this many seconds
:param logger: Logger() object
:return:
'''
+ # set up spontaneous exit settings
+ start_time = time.time()
+ total_echoed = 0
+
# set up listening socket
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((HOST, port))
sock.listen()
sock.setblocking(False)
@@ -50,29 +88,48 @@ def main_except(port, logger):
# event loop
while True:
- events = sel.select(timeout=None)
- for key, mask in events:
- if key.data is None:
- do_accept(key.fileobj, sel, logger)
- else:
- do_service(key, mask, sel, logger)
+ if timeout > 0.0:
+ elapsed = time.time() - start_time
+ if elapsed > timeout:
+ logger.log("Exiting due to timeout. Total echoed = %d" % total_echoed)
+ break
+ if echo_count > 0:
+ if total_echoed >= echo_count:
+ logger.log("Exiting due to echo byte count. Total echoed = %d" % total_echoed)
+ break
+ events = sel.select(timeout=0.1)
+ if events:
+ for key, mask in events:
+ if key.data is None:
+ if key.fileobj is sock:
+ do_accept(key.fileobj, sel, logger)
+ else:
+ assert(False, "Only listener 'sock' has None in opaque data field")
+ else:
+ total_echoed += do_service(key, mask, sel, logger)
+ else:
+ pass # select timeout. probably.
+
+ sel.unregister(sock)
+ sock.close()
def do_accept(sock, sel, logger):
conn, addr = sock.accept()
logger.log('Accepted connection from %s:%d' % (addr[0], addr[1]))
conn.setblocking(False)
- data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'')
events = selectors.EVENT_READ | selectors.EVENT_WRITE
- sel.register(conn, events, data=data)
+ sel.register(conn, events, data=ClientRecord(addr))
def do_service(key, mask, sel, logger):
+ retval = 0
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024)
if recv_data:
data.outb += recv_data
- logger.log('ECHO read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data), repr(recv_data)))
+ logger.log('read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data), repr(recv_data)))
+ sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)
else:
logger.log('Closing connection to %s:%d' % (data.addr[0], data.addr[1]))
sel.unregister(sock)
@@ -80,40 +137,75 @@ def do_service(key, mask, sel, logger):
if mask & selectors.EVENT_WRITE:
if data.outb:
sent = sock.send(data.outb)
+ retval += sent
if sent > 0:
- logger.log('ECHO write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent, repr(data.outb[:sent])))
+ logger.log('write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent, repr(data.outb[:sent])))
else:
- logger.log('ECHO write to : %s:%d len:0' % (data.addr[0], data.addr[1]))
+ logger.log('write to : %s:%d len:0' % (data.addr[0], data.addr[1]))
data.outb = data.outb[sent:]
-
+ else:
+ logger.log('write event with no data' + str(data))
+ sel.modify(sock, selectors.EVENT_READ, data=data)
+ return retval
def main(argv):
+ retval = 0
try:
# parse args
p = argparse.ArgumentParser()
p.add_argument('--port', '-p',
help='Required listening port number')
+ p.add_argument('--name',
+ help='Optional logger prefix')
+ p.add_argument('--echo', '-e', type=int, default=0, const=1, nargs="?",
+ help='Exit after echoing this many bytes. Default value "0" disables exiting on byte count.')
+ p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?",
+ help='Timeout in seconds. Default value "0" disables timeouts')
p.add_argument('--log', '-l',
action='store_true',
help='Write activity log to console')
del argv[0]
args = p.parse_args(argv)
+ lsock = None
+
# port
if args.port is None:
raise Exception("User must specify a port number")
port = int(args.port)
+ # name / prefix
+ prefix = args.name if args.name is not None else "ECHO_SERVER"
+
+ # echo
+ if args.echo < 0:
+ raise Exception("Echo count must be greater than zero")
+
+ # timeout
+ if args.timeout < 0.0:
+ raise Exception("Timeout must be greater than zero")
+
# logging
- logger = Logger(title = "TCP_echo_server port %d" % port,
- print_to_console = args.log,
- save_for_dump = False)
+ logger = EchoLogger(prefix = prefix,
+ title = "%s port %d" % (prefix, port),
+ print_to_console = args.log,
+ save_for_dump = False)
+
+ # the listening socket
+ lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+ main_except(lsock, port, args.echo, args.timeout, logger)
+
+ except KeyboardInterrupt:
+ pass
- main_except(port, logger)
- return 0
except Exception as e:
traceback.print_exc()
- return 1
+ retval = 1
+
+ if lsock is not None:
+ lsock.close()
+ return retval
if __name__ == "__main__":
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org