You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2020/10/27 13:04:06 UTC

[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1807: TCP self test - improve test server

This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
     new 4aff90c  DISPATCH-1807: TCP self test - improve test server
4aff90c is described below

commit 4aff90cbd9d31140da437ffdf2e3aca9a999e9a2
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Tue Oct 27 09:02:06 2020 -0400

    DISPATCH-1807: TCP self test - improve test server
    
    * Add timeout
    * Add exit-on-byte-count
    * Improve logging for merge with Scraper
---
 tests/TCP_echo_server.py | 132 ++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 112 insertions(+), 20 deletions(-)

diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py
index 61e95a2..bdfecfe 100755
--- a/tests/TCP_echo_server.py
+++ b/tests/TCP_echo_server.py
@@ -22,8 +22,10 @@
 import argparse
 import os
 import selectors
+from signal import signal, SIGINT
 import socket
 import sys
+import time
 import traceback
 import types
 
@@ -31,14 +33,50 @@ from system_test import Logger
 
 HOST = '127.0.0.1'
 
-def main_except(port, logger):
+class ClientRecord(object):
+    """
+    Object to register with the selector 'data' field
+    for incoming user connections. This is *not* used
+    for the listening socket.
+    This object holds the socketId in the address and
+    the inbound and outbound data list buffers for this
+    socket's payload.
+    """
+    def __init__(self, address):
+        self.addr = address
+        self.inb = b''
+        self.outb = b''
+
+    def __repr__(self):
+        return str(self.addr) + " len(in)=" + str(len(self.inb)) + " len(out)=" + str(len(self.outb))
+
+    def __str__(self):
+        return self.__repr__()
+
+
+class EchoLogger(Logger):
+    def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False):
+        self.prefix = prefix + ' ' if len(prefix) > 0 else ''
+        super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump)
+    
+    def log(self, msg):
+        super(EchoLogger, self).log(self.prefix + msg)
+
+
+def main_except(sock, port, echo_count, timeout, logger):
     '''
+    :param lsock: socket to listen on
     :param port: port to listen on
+    :param echo_count: exit after echoing this many bytes
+    :param timeout: exit after this many seconds
     :param logger: Logger() object
     :return:
     '''
+    # set up spontaneous exit settings
+    start_time = time.time()
+    total_echoed = 0
+
     # set up listening socket
-    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     sock.bind((HOST, port))
     sock.listen()
     sock.setblocking(False)
@@ -50,29 +88,48 @@ def main_except(port, logger):
 
     # event loop
     while True:
-        events = sel.select(timeout=None)
-        for key, mask in events:
-            if key.data is None:
-                do_accept(key.fileobj, sel, logger)
-            else:
-                do_service(key, mask, sel, logger)
+        if timeout > 0.0:
+            elapsed = time.time() - start_time
+            if elapsed > timeout:
+                logger.log("Exiting due to timeout. Total echoed = %d" % total_echoed)
+                break
+        if echo_count > 0:
+            if total_echoed >= echo_count:
+                logger.log("Exiting due to echo byte count. Total echoed = %d" % total_echoed)
+                break
+        events = sel.select(timeout=0.1)
+        if events:
+            for key, mask in events:
+                if key.data is None:
+                    if key.fileobj is sock:
+                        do_accept(key.fileobj, sel, logger)
+                    else:
+                        assert(False, "Only listener 'sock' has None in opaque data field")
+                else:
+                    total_echoed += do_service(key, mask, sel, logger)
+        else:
+            pass # select timeout. probably.
+
+    sel.unregister(sock)
+    sock.close()
 
 def do_accept(sock, sel, logger):
     conn, addr = sock.accept()
     logger.log('Accepted connection from %s:%d' % (addr[0], addr[1]))
     conn.setblocking(False)
-    data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'')
     events = selectors.EVENT_READ | selectors.EVENT_WRITE
-    sel.register(conn, events, data=data)
+    sel.register(conn, events, data=ClientRecord(addr))
 
 def do_service(key, mask, sel, logger):
+    retval = 0
     sock = key.fileobj
     data = key.data
     if mask & selectors.EVENT_READ:
         recv_data = sock.recv(1024)
         if recv_data:
             data.outb += recv_data
-            logger.log('ECHO read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data), repr(recv_data)))
+            logger.log('read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data), repr(recv_data)))
+            sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)
         else:
             logger.log('Closing connection to %s:%d' % (data.addr[0], data.addr[1]))
             sel.unregister(sock)
@@ -80,40 +137,75 @@ def do_service(key, mask, sel, logger):
     if mask & selectors.EVENT_WRITE:
         if data.outb:
             sent = sock.send(data.outb)
+            retval += sent
             if sent > 0:
-                logger.log('ECHO write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent, repr(data.outb[:sent])))
+                logger.log('write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent, repr(data.outb[:sent])))
             else:
-                logger.log('ECHO write to : %s:%d len:0' % (data.addr[0], data.addr[1]))
+                logger.log('write to : %s:%d len:0' % (data.addr[0], data.addr[1]))
             data.outb = data.outb[sent:]
-
+        else:
+            logger.log('write event with no data' + str(data))
+            sel.modify(sock, selectors.EVENT_READ, data=data)
+    return retval
 
 def main(argv):
+    retval = 0
     try:
         # parse args
         p = argparse.ArgumentParser()
         p.add_argument('--port', '-p',
                        help='Required listening port number')
+        p.add_argument('--name',
+                       help='Optional logger prefix')
+        p.add_argument('--echo', '-e', type=int, default=0, const=1, nargs="?",
+                       help='Exit after echoing this many bytes. Default value "0" disables exiting on byte count.')
+        p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?",
+                       help='Timeout in seconds. Default value "0" disables timeouts')
         p.add_argument('--log', '-l',
                        action='store_true',
                        help='Write activity log to console')
         del argv[0]
         args = p.parse_args(argv)
 
+        lsock = None
+
         # port
         if args.port is None:
             raise Exception("User must specify a port number")
         port = int(args.port)
 
+        # name / prefix
+        prefix = args.name if args.name is not None else "ECHO_SERVER"
+
+        # echo
+        if args.echo < 0:
+            raise Exception("Echo count must be greater than zero")
+
+        # timeout
+        if args.timeout < 0.0:
+            raise Exception("Timeout must be greater than zero")
+
         # logging
-        logger = Logger(title = "TCP_echo_server port %d" % port,
-                        print_to_console = args.log,
-                        save_for_dump = False)
+        logger = EchoLogger(prefix = prefix,
+                            title = "%s port %d" % (prefix, port),
+                            print_to_console = args.log,
+                            save_for_dump = False)
+
+        # the listening socket
+        lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+        main_except(lsock, port, args.echo, args.timeout, logger)
+
+    except KeyboardInterrupt:
+        pass
 
-        main_except(port, logger)
-        return 0
     except Exception as e:
         traceback.print_exc()
-        return 1
+        retval = 1
+
+    if lsock is not None:
+        lsock.close()
+    return retval
 
 
 if __name__ == "__main__":


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org