You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2020/10/30 13:42:10 UTC

[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1807: Add self tests for tcp protocol adaptor

This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
     new e29ea39  DISPATCH-1807: Add self tests for tcp protocol adaptor
e29ea39 is described below

commit e29ea396acfab940e54db36b569a865720ac9f8a
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Fri Oct 30 09:40:35 2020 -0400

    DISPATCH-1807: Add self tests for tcp protocol adaptor
    
    * echo_server and echo_client modified to have a test class
      that runs the test proper.
    
    * tcp_adaptor test runs the client and server test classes in
      separate threads and not in separate processes.
    
    This closes #905
---
 tests/TCP_echo_client.py          | 422 +++++++++++++++++++++-----------------
 tests/TCP_echo_server.py          | 357 +++++++++++++++++++-------------
 tests/system_tests_tcp_adaptor.py | 147 +++++++------
 3 files changed, 532 insertions(+), 394 deletions(-)

diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py
index 5f2a687..57152e3 100755
--- a/tests/TCP_echo_client.py
+++ b/tests/TCP_echo_client.py
@@ -19,25 +19,33 @@
 # under the License.
 #
 
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
 import argparse
 import os
 import selectors
+import signal
 import socket
 import sys
+from threading import Thread
 import time
 import traceback
 import types
 
 from system_test import Logger
+from system_test import TIMEOUT
 
+class GracefulKiller:
+  kill_now = False
+  def __init__(self):
+    signal.signal(signal.SIGINT, self.exit_gracefully)
+    signal.signal(signal.SIGTERM, self.exit_gracefully)
 
-class EchoLogger(Logger):
-    def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False):
-        self.prefix = prefix + ' ' if len(prefix) > 0 else ''
-        super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump)
-
-    def log(self, msg):
-        super(EchoLogger, self).log(self.prefix + msg)
+  def exit_gracefully(self,signum, frame):
+    self.kill_now = True
 
 
 def split_chunk_for_display(raw_bytes):
@@ -55,192 +63,240 @@ def split_chunk_for_display(raw_bytes):
     return result
 
 
-def main_except(host, port, size, count, timeout, logger):
-    '''
-    :param host: connect to this host
-    :param port: connect to this port
-    :param size: size of individual payload chunks in bytes
-    :param count: number of payload chunks
-    :param strategy: "1" Send one payload;  # TODO
-                         Recv one payload
-    :param logger: Logger() object
-    :return:
-    '''
-    # Start up
-    start_time = time.time()
-    logger.log('Connecting to host:%s, port:%d, size:%d, count:%d' % (host, port, size, count))
-    keep_going = True
-    total_sent = 0
-    total_rcvd = 0
-
-    # outbound payload
-    payload_out = []
-    out_list_idx = 0  # current _out array being sent
-    out_byte_idx = 0  # next-to-send in current array
-    out_ready_to_send = True
-    # Generate unique content for each message so you can tell where the message
-    # or fragment belongs in the whole stream. Chunks look like:
-    #    b'[localhost:33333:6:0]ggggggggggggggggggggggggggggg'
-    #    host: localhost
-    #    port: 33333
-    #    index: 6
-    #    offset into message: 0
-    MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo server, too
-    for idx in range(count):
-        body_msg = ""
-        padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30]
-        while len(body_msg) < size:
-            chunk = "[%s:%d:%d:%d]" % (host, port, idx, len(body_msg))
-            padlen = MAGIC_SIZE - len(chunk)
-            chunk += padchar * padlen
-            body_msg += chunk
-        if len(body_msg) > size:
-            body_msg = body_msg[:size]
-        payload_out.append(bytearray(body_msg.encode()))
-    # incoming payloads
-    payload_in  = []
-    in_list_idx = 0 # current _in array being received
-    for i in range(count):
-        payload_in.append(bytearray())
-
-    # set up connection
-    host_address = (host, port)
-    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    sock.connect(host_address)
-    sock.setblocking(False)
-
-    # set up selector
-    sel = selectors.DefaultSelector()
-    sel.register(sock,
-                 selectors.EVENT_READ | selectors.EVENT_WRITE)
-
-    # event loop
-    while keep_going:
-        if timeout > 0.0:
-            elapsed = time.time() - start_time
-            if elapsed > timeout:
-                logger.log("Exiting due to timeout. Total sent= %d, total rcvd= %d" % (total_sent, total_rcvd))
-                break
-        for key, mask in sel.select(timeout=0.1):
-            sock = key.fileobj
-            if mask & selectors.EVENT_READ:
-                recv_data = sock.recv(1024)
-                if recv_data:
-                    total_rcvd = len(recv_data)
-                    payload_in[in_list_idx].extend(recv_data)
-                    if len(payload_in[in_list_idx]) == size:
-                        logger.log("Rcvd message %d" % in_list_idx)
-                        in_list_idx += 1
-                        if in_list_idx == count:
-                            # Received all bytes of all chunks - done.
-                            keep_going = False
-                            # Verify the received data
-                            for idxc in range(count):
-                                for idxs in range(size):
-                                    ob = payload_out[idxc][idxs]
-                                    ib = payload_in [idxc][idxs]
-                                    if ob != ib:
-                                        error = "CRITICAL Rcvd message verify fail. row:%d, col:%d, expected:%s, actual:%s" \
-                                                % (idxc, idxs, repr(ob), repr(ib))
-                                        logger.log(error)
-                                        raise Exception(error)
+class TcpEchoClient():
+
+    def __init__(self, prefix, host, port, size, count, timeout, logger):
+        '''
+        :param host: connect to this host
+        :param port: connect to this port
+        :param size: size of individual payload chunks in bytes
+        :param count: number of payload chunks
+        :param strategy: "1" Send one payload;  # TODO
+                             Recv one payload
+        :param logger: Logger() object
+        :return:
+        '''
+        # Start up
+        self.sock = None
+        self.prefix = prefix
+        self.host = host
+        self.port = port
+        self.size = size
+        self.count = count
+        self.timeout = timeout
+        self.logger = logger
+        self.keep_running = True
+        self.is_running = False
+        self.exit_status = None
+        self.error = None
+        self._thread = Thread(target=self.run)
+        self._thread.daemon = True
+        self._thread.start()
+
+    def run(self):
+        try:
+            start_time = time.time()
+            self.is_running = True
+            self.logger.log('%s Connecting to host:%s, port:%d, size:%d, count:%d' %
+                       (self.prefix, self.host, self.port, self.size, self.count))
+            total_sent = 0
+            total_rcvd = 0
+
+            # outbound payload
+            payload_out = []
+            out_list_idx = 0  # current _out array being sent
+            out_byte_idx = 0  # next-to-send in current array
+            out_ready_to_send = True
+            # Generate unique content for each message so you can tell where the message
+            # or fragment belongs in the whole stream. Chunks look like:
+            #    b'[localhost:33333:6:0]ggggggggggggggggggggggggggggg'
+            #    host: localhost
+            #    port: 33333
+            #    index: 6
+            #    offset into message: 0
+            CONTENT_CHUNK_SIZE = 50 # Content repeats after chunks this big - used by echo server, too
+            for idx in range(self.count):
+                body_msg = ""
+                padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30]
+                while len(body_msg) < self.size:
+                    chunk = "[%s:%d:%d:%d]" % (self.host, self.port, idx, len(body_msg))
+                    padlen = CONTENT_CHUNK_SIZE - len(chunk)
+                    chunk += padchar * padlen
+                    body_msg += chunk
+                if len(body_msg) > self.size:
+                    body_msg = body_msg[:self.size]
+                payload_out.append(bytearray(body_msg.encode()))
+            # incoming payloads
+            payload_in  = []
+            in_list_idx = 0 # current _in array being received
+            for i in range(self.count):
+                payload_in.append(bytearray())
+
+            # set up connection
+            host_address = (self.host, self.port)
+            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self.sock.connect(host_address)
+            self.sock.setblocking(False)
+
+            # set up selector
+            sel = selectors.DefaultSelector()
+            sel.register(self.sock,
+                         selectors.EVENT_READ | selectors.EVENT_WRITE)
+
+            # event loop
+            while self.keep_running:
+                if self.timeout > 0.0:
+                    elapsed = time.time() - start_time
+                    if elapsed > self.timeout:
+                        self.exit_status = "%s Exiting due to timeout. Total sent= %d, total rcvd= %d" % \
+                                            (self.prefix, total_sent, total_rcvd)
+                        break
+                for key, mask in sel.select(timeout=0.1):
+                    sock = key.fileobj
+                    if mask & selectors.EVENT_READ:
+                        recv_data = sock.recv(1024)
+                        if recv_data:
+                            total_rcvd = len(recv_data)
+                            payload_in[in_list_idx].extend(recv_data)
+                            if len(payload_in[in_list_idx]) == self.size:
+                                self.logger.log("%s Rcvd message %d" % (self.prefix, in_list_idx))
+                                in_list_idx += 1
+                                if in_list_idx == self.count:
+                                    # Received all bytes of all chunks - done.
+                                    self.keep_running = False
+                                    # Verify the received data
+                                    for idxc in range(self.count):
+                                        for idxs in range(self.size):
+                                            ob = payload_out[idxc][idxs]
+                                            ib = payload_in [idxc][idxs]
+                                            if ob != ib:
+                                                self.error = "%s ERROR Rcvd message verify fail. row:%d, col:%d, expected:%s, actual:%s" \
+                                                        % (self.prefix, idxc, idxs, repr(ob), repr(ib))
+                                                break
+                                else:
+                                    out_ready_to_send = True
+                                    sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE)
+                            elif len(payload_in[in_list_idx]) > self.size:
+                                self.error = "ERROR Received message too big. Expected:%d, actual:%d" % \
+                                              (self.size, len(payload_in[in_list_idx]))
+                                break
+                            else:
+                                pass # still accumulating a message
+                        else:
+                            # socket closed
+                            self.keep_running = False
+                    if self.keep_running and mask & selectors.EVENT_WRITE:
+                        if out_ready_to_send:
+                            n_sent = self.sock.send( payload_out[out_list_idx][out_byte_idx:] )
+                            total_sent += n_sent
+                            out_byte_idx += n_sent
+                            if out_byte_idx == self.size:
+                                self.logger.log("%s Sent message %d" % (self.prefix, out_list_idx))
+                                out_byte_idx = 0
+                                out_list_idx += 1
+                                sel.modify(self.sock, selectors.EVENT_READ) # turn off write events
+                                out_ready_to_send = False # turn on when rcvr receives
                         else:
-                            out_ready_to_send = True
-                            sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE)
-                    elif len(payload_in[in_list_idx]) > size:
-                        error = "CRITICAL Rcvd message too big. Expected:%d, actual:%d" % \
-                                (size, len(payload_in[in_list_idx]))
-                        logger.log(error)
-                        raise Exception(error)
-                    else:
-                        pass # still accumulating a message
-                else:
-                    # socket closed
-                    keep_going = False
-            if mask & selectors.EVENT_WRITE:
-                if out_ready_to_send:
-                    n_sent = sock.send( payload_out[out_list_idx][out_byte_idx:] )
-                    total_sent += n_sent
-                    out_byte_idx += n_sent
-                    if out_byte_idx == size:
-                        logger.log("Sent message %d" % out_list_idx)
-                        out_byte_idx = 0
-                        out_list_idx += 1
-                        sel.modify(sock, selectors.EVENT_READ) # turn off write events
-                        out_ready_to_send = False # turn on when rcvr receives
-                else:
-                    logger.log("DEBUG: ignoring EVENT_WRITE")
-
-    # shut down
-    sel.unregister(sock)
-    sock.close()
+                            pass # logger.log("DEBUG: ignoring EVENT_WRITE")
+
+            # shut down
+            sel.unregister(self.sock)
+            self.sock.close()
+
+        except Exception as exc:
+            self.error = "ERROR: exception : '%s'" % traceback.format_exc()
+
+        self.is_running = False
+
+    def wait(self, timeout=TIMEOUT):
+        self.logger.log("%s Client is shutting down" % (self.prefix))
+        self.keep_running = False
+        self._thread.join(timeout)
+
 
 def main(argv):
-    try:
-        # parse args
-        p = argparse.ArgumentParser()
-        p.add_argument('--host', '-b',
-                       help='Required target host')
-        p.add_argument('--port', '-p', type=int,
-                       help='Required target port number')
-        p.add_argument('--size', '-s', type=int, default=100, const=1, nargs='?',
-                       help='Size of payload in bytes')
-        p.add_argument('--count', '-c', type=int, default=1, const=1, nargs='?',
-                       help='Number of payloads to process')
-        p.add_argument('--name',
-                       help='Optional logger prefix')
-        p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?",
-                       help='Timeout in seconds. Default value "0" disables timeouts')
-        p.add_argument('--log', '-l',
-                       action='store_true',
-                       help='Write activity log to console')
-        del argv[0]
-        args = p.parse_args(argv)
-
-        # host
-        if args.host is None:
-            raise Exception("User must specify a host")
-        host = args.host
-
-        # port
-        if args.port is None:
-            raise Exception("User must specify a port number")
-        port = args.port
-
-        # size
-        if args.size <= 0:
-            raise Exception("Size must be greater than zero")
-        size = args.size
-
-        # count
-        if args.count <= 0:
-            raise Exception("Count must be greater than zero")
-        count = args.count
-
-        # name / prefix
-        prefix = args.name if args.name is not None else "ECHO_CLIENT"
-
-        # timeout
-        if args.timeout < 0.0:
-            raise Exception("Timeout must be greater than or equal to zero")
+    retval = 0
+    # parse args
+    p = argparse.ArgumentParser()
+    p.add_argument('--host', '-b',
+                   help='Required target host')
+    p.add_argument('--port', '-p', type=int,
+                   help='Required target port number')
+    p.add_argument('--size', '-s', type=int, default=100, const=1, nargs='?',
+                   help='Size of payload in bytes')
+    p.add_argument('--count', '-c', type=int, default=1, const=1, nargs='?',
+                   help='Number of payloads to process')
+    p.add_argument('--name',
+                   help='Optional logger prefix')
+    p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?",
+                   help='Timeout in seconds. Default value "0" disables timeouts')
+    p.add_argument('--log', '-l',
+                   action='store_true',
+                   help='Write activity log to console')
+    del argv[0]
+    args = p.parse_args(argv)
+
+    # host
+    if args.host is None:
+        raise Exception("User must specify a host")
+    host = args.host
+
+    # port
+    if args.port is None:
+        raise Exception("User must specify a port number")
+    port = args.port
+
+    # size
+    if args.size <= 0:
+        raise Exception("Size must be greater than zero")
+    size = args.size
 
+    # count
+    if args.count <= 0:
+        raise Exception("Count must be greater than zero")
+    count = args.count
+
+    # name / prefix
+    prefix = args.name if args.name is not None else "ECHO_CLIENT (%d_%d_%d)" % \
+                                                     (port, size, count)
+
+    # timeout
+    if args.timeout < 0.0:
+        raise Exception("Timeout must be greater than or equal to zero")
+
+    killer = GracefulKiller()
+    client = None
+
+    try:
         # logging
-        logger = EchoLogger(prefix=prefix,
-                            title = "%s host:%s port %d size:%d count:%d" % (prefix, host, port, size, count),
-                            print_to_console = args.log,
-                            save_for_dump = False)
+        logger = Logger(title = "%s host:%s port %d size:%d count:%d" % (prefix, host, port, size, count),
+                        print_to_console = args.log,
+                        save_for_dump = False)
 
-        main_except(host, port, size, count, args.timeout, logger)
-        return 0
+        client = TcpEchoClient(prefix, host, port, size, count, args.timeout, logger)
 
-    except KeyboardInterrupt:
-        logger.log("Exiting due to KeyboardInterrupt")
-        return 0
+        keep_running = True
+        while keep_running:
+            time.sleep(0.1)
+            if client.error is not None:
+                logger.log("%s Client stopped with error: %s" % (prefix, client.error))
+                keep_running = False
+                retval = 1
+            if client.exit_status is not None:
+                logger.log("%s Client stopped with status: %s" % (prefix, client.exit_status))
+                keep_running = False
+            if killer.kill_now:
+                logger.log("%s Process killed with signal" % (prefix))
+                keep_running = False
+            if keep_running and not client.is_running:
+                logger.log("%s Client stopped with no error or status" % (prefix))
+                keep_running = False
 
     except Exception as e:
-        traceback.print_exc()
-        return 1
+        logger.log("%s Exception: %s" % (prefix, traceback.format_exc()))
+        retval = 1
 
+    return retval
 
 if __name__ == "__main__":
     sys.exit(main(sys.argv))
diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py
index 28fc2ae..34f72bb 100755
--- a/tests/TCP_echo_server.py
+++ b/tests/TCP_echo_server.py
@@ -19,19 +19,24 @@
 # under the License.
 #
 
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
 import argparse
 import os
 import selectors
-from signal import signal, SIGINT
+import signal
 import socket
 import sys
+from threading import Thread
 import time
 import traceback
 import types
 
 from system_test import Logger
-
-HOST = '127.0.0.1'
+from system_test import TIMEOUT
 
 class ClientRecord(object):
     """
@@ -54,176 +59,234 @@ class ClientRecord(object):
         return self.__repr__()
 
 
-class EchoLogger(Logger):
-    def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False):
-        self.prefix = prefix + ' ' if len(prefix) > 0 else ''
-        super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump)
-    
-    def log(self, msg):
-        super(EchoLogger, self).log(self.prefix + msg)
+class GracefulKiller:
+  kill_now = False
+  def __init__(self):
+    signal.signal(signal.SIGINT, self.exit_gracefully)
+    signal.signal(signal.SIGTERM, self.exit_gracefully)
+
+  def exit_gracefully(self,signum, frame):
+    self.kill_now = True
 
 
 def split_chunk_for_display(raw_bytes):
     """
     Given some raw bytes, return a display string
-    Only show the beginning and end of largish (2xMAGIC_SIZE) arrays.
+    Only show the beginning and end of largish (2x CONTENT_CHUNK_SIZE) arrays.
     :param raw_bytes:
     :return: display string
     """
-    MAGIC_SIZE = 50  # Content repeats after chunks this big - used by echo client, too
-    if len(raw_bytes) > 2 * MAGIC_SIZE:
-        result = repr(raw_bytes[:MAGIC_SIZE]) + " ... " + repr(raw_bytes[-MAGIC_SIZE:])
+    CONTENT_CHUNK_SIZE = 50  # Content repeats after chunks this big - used by echo client, too
+    if len(raw_bytes) > 2 * CONTENT_CHUNK_SIZE:
+        result = repr(raw_bytes[:CONTENT_CHUNK_SIZE]) + " ... " + repr(raw_bytes[-CONTENT_CHUNK_SIZE:])
     else:
         result = repr(raw_bytes)
     return result
 
 
-def main_except(sock, port, echo_count, timeout, logger):
-    '''
-    :param lsock: socket to listen on
-    :param port: port to listen on
-    :param echo_count: exit after echoing this many bytes
-    :param timeout: exit after this many seconds
-    :param logger: Logger() object
-    :return:
-    '''
-    # set up spontaneous exit settings
-    start_time = time.time()
-    total_echoed = 0
-
-    # set up listening socket
-    sock.bind((HOST, port))
-    sock.listen()
-    sock.setblocking(False)
-    logger.log('Listening on host:%s, port:%d' % (HOST, port))
-
-    # set up selector
-    sel = selectors.DefaultSelector()
-    sel.register(sock, selectors.EVENT_READ, data=None)
-
-    # event loop
-    while True:
-        if timeout > 0.0:
-            elapsed = time.time() - start_time
-            if elapsed > timeout:
-                logger.log("Exiting due to timeout. Total echoed = %d" % total_echoed)
-                break
-        if echo_count > 0:
-            if total_echoed >= echo_count:
-                logger.log("Exiting due to echo byte count. Total echoed = %d" % total_echoed)
-                break
-        events = sel.select(timeout=0.1)
-        if events:
-            for key, mask in events:
-                if key.data is None:
-                    if key.fileobj is sock:
-                        do_accept(key.fileobj, sel, logger)
-                    else:
-                        raise Exception("Only listener 'sock' has None in opaque data field")
+class TcpEchoServer():
+    def __init__(self, prefix="ECHO_SERVER", port="0", echo_count=0, timeout=0.0, logger=None):
+        '''
+        Start echo server in separate thread
+
+        :param prefix: log prefix
+        :param port: port to listen on
+        :param echo_count: exit after echoing this many bytes
+        :param timeout: exit after this many seconds
+        :param logger: Logger() object
+        :return:
+        '''
+        self.sock = None
+        self.prefix = prefix
+        self.port = port
+        self.echo_count = echo_count
+        self.timeout = timeout
+        self.logger = logger
+        self.keep_running = True
+        self.HOST = '127.0.0.1'
+        self.is_running = False
+        self.exit_status = None
+        self.error = None
+        self._thread = Thread(target=self.run)
+        self._thread.daemon = True
+        self._thread.start()
+
+    def run(self):
+        """
+        Run server in daemon thread
+        :return:
+        """
+        try:
+            # set up spontaneous exit settings
+            self.is_running = True
+            start_time = time.time()
+            total_echoed = 0
+
+            # set up listening socket
+            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self.sock.bind((self.HOST, self.port))
+            self.sock.listen()
+            self.sock.setblocking(False)
+            self.logger.log('%s Listening on host:%s, port:%d' % (self.prefix, self.HOST, self.port))
+
+            # set up selector
+            sel = selectors.DefaultSelector()
+            sel.register(self.sock, selectors.EVENT_READ, data=None)
+
+            # event loop
+            while True:
+                if not self.keep_running:
+                    self.exit_status = "INFO: command shutdown:"
+                    break
+                if self.timeout > 0.0:
+                    elapsed = time.time() - start_time
+                    if elapsed > self.timeout:
+                        self.exit_status = "Exiting due to timeout. Total echoed = %d" % total_echoed
+                        break
+                if self.echo_count > 0:
+                    if total_echoed >= self.echo_count:
+                        self.exit_status = "Exiting due to echo byte count. Total echoed = %d" % total_echoed
+                        break
+                events = sel.select(timeout=0.1)
+                if events:
+                    for key, mask in events:
+                        if key.data is None:
+                            if key.fileobj is self.sock:
+                                self.do_accept(key.fileobj, sel, self.logger)
+                            else:
+                                pass # Only listener 'sock' has None in opaque data field
+                        else:
+                            total_echoed += self.do_service(key, mask, sel, self.logger)
+                else:
+                    pass # select timeout. probably.
+
+            sel.unregister(self.sock)
+            self.sock.close()
+
+        except Exception as exc:
+            self.error = "ERROR: exception : '%s'" % traceback.format_exc()
+
+        self.is_running = False
+
+    def do_accept(self, sock, sel, logger):
+        conn, addr = sock.accept()
+        logger.log('%s Accepted connection from %s:%d' % (self.prefix, addr[0], addr[1]))
+        conn.setblocking(False)
+        events = selectors.EVENT_READ | selectors.EVENT_WRITE
+        sel.register(conn, events, data=ClientRecord(addr))
+
+    def do_service(self, key, mask, sel, logger):
+        retval = 0
+        sock = key.fileobj
+        data = key.data
+        if mask & selectors.EVENT_READ:
+            try:
+                recv_data = sock.recv(1024)
+            except ConnectionResetError as exc:
+                logger.log('%s Connection to %s:%d closed by peer' % (self.prefix, data.addr[0], data.addr[1]))
+                sel.unregister(sock)
+                sock.close()
+                return 0
+            if recv_data:
+                data.outb += recv_data
+                logger.log('%s read from: %s:%d len:%d: %s' % (self.prefix, data.addr[0], data.addr[1], len(recv_data),
+                                                            split_chunk_for_display(recv_data)))
+                sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)
+            else:
+                logger.log('%s Closing connection to %s:%d' % (self.prefix, data.addr[0], data.addr[1]))
+                sel.unregister(sock)
+                sock.close()
+                return 0
+        if mask & selectors.EVENT_WRITE:
+            if data.outb:
+                sent = sock.send(data.outb)
+                retval += sent
+                if sent > 0:
+                    logger.log('%s write to : %s:%d len:%d: %s' % (self.prefix, data.addr[0], data.addr[1], sent,
+                                                                split_chunk_for_display(data.outb[:sent])))
                 else:
-                    total_echoed += do_service(key, mask, sel, logger)
-        else:
-            pass # select timeout. probably.
+                    logger.log('%s write to : %s:%d len:0' % (self.prefix, data.addr[0], data.addr[1]))
+                data.outb = data.outb[sent:]
+            else:
+                sel.modify(sock, selectors.EVENT_READ, data=data)
+        return retval
 
-    sel.unregister(sock)
-    sock.close()
+    def wait(self, timeout=TIMEOUT):
+        self.logger.log("%s Server is shutting down" % (self.prefix))
+        self.keep_running = False
+        self._thread.join(timeout)
 
-def do_accept(sock, sel, logger):
-    conn, addr = sock.accept()
-    logger.log('Accepted connection from %s:%d' % (addr[0], addr[1]))
-    conn.setblocking(False)
-    events = selectors.EVENT_READ | selectors.EVENT_WRITE
-    sel.register(conn, events, data=ClientRecord(addr))
-
-def do_service(key, mask, sel, logger):
-    retval = 0
-    sock = key.fileobj
-    data = key.data
-    if mask & selectors.EVENT_READ:
-        recv_data = sock.recv(1024)
-        if recv_data:
-            data.outb += recv_data
-            logger.log('read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data),
-                                                        split_chunk_for_display(recv_data)))
-            sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)
-        else:
-            logger.log('Closing connection to %s:%d' % (data.addr[0], data.addr[1]))
-            sel.unregister(sock)
-            sock.close()
-            return 0
-    if mask & selectors.EVENT_WRITE:
-        if data.outb:
-            sent = sock.send(data.outb)
-            retval += sent
-            if sent > 0:
-                logger.log('write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent,
-                                                            split_chunk_for_display(data.outb[:sent])))
-            else:
-                logger.log('write to : %s:%d len:0' % (data.addr[0], data.addr[1]))
-            data.outb = data.outb[sent:]
-        else:
-            #logger.log('write event with no data' + str(data))
-            sel.modify(sock, selectors.EVENT_READ, data=data)
-    return retval
 
 def main(argv):
     retval = 0
-    try:
-        # parse args
-        p = argparse.ArgumentParser()
-        p.add_argument('--port', '-p',
-                       help='Required listening port number')
-        p.add_argument('--name',
-                       help='Optional logger prefix')
-        p.add_argument('--echo', '-e', type=int, default=0, const=1, nargs="?",
-                       help='Exit after echoing this many bytes. Default value "0" disables exiting on byte count.')
-        p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?",
-                       help='Timeout in seconds. Default value "0" disables timeouts')
-        p.add_argument('--log', '-l',
-                       action='store_true',
-                       help='Write activity log to console')
-        del argv[0]
-        args = p.parse_args(argv)
-
-        lsock = None
-
-        # port
-        if args.port is None:
-            raise Exception("User must specify a port number")
-        port = int(args.port)
-
-        # name / prefix
-        prefix = args.name if args.name is not None else "ECHO_SERVER"
-
-        # echo
-        if args.echo < 0:
-            raise Exception("Echo count must be greater than zero")
-
-        # timeout
-        if args.timeout < 0.0:
-            raise Exception("Timeout must be greater than or equal to zero")
+    # parse args
+    p = argparse.ArgumentParser()
+    p.add_argument('--port', '-p',
+                   help='Required listening port number')
+    p.add_argument('--name',
+                   help='Optional logger prefix')
+    p.add_argument('--echo', '-e', type=int, default=0, const=1, nargs="?",
+                   help='Exit after echoing this many bytes. Default value "0" disables exiting on byte count.')
+    p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?",
+                   help='Timeout in seconds. Default value "0" disables timeouts')
+    p.add_argument('--log', '-l',
+                   action='store_true',
+                   help='Write activity log to console')
+    del argv[0]
+    args = p.parse_args(argv)
+
+    # port
+    if args.port is None:
+        raise Exception("User must specify a port number")
+    port = int(args.port)
+
+    # name / prefix
+    prefix = args.name if args.name is not None else "ECHO_SERVER (%s)" % (str(port))
+
+    # echo
+    if args.echo < 0:
+        raise Exception("Echo count must be greater than zero")
+
+    # timeout
+    if args.timeout < 0.0:
+        raise Exception("Timeout must be greater than or equal to zero")
+
+    killer = GracefulKiller()
+    server = None
 
+    try:
         # logging
-        logger = EchoLogger(prefix = prefix,
-                            title = "%s port %d" % (prefix, port),
-                            print_to_console = args.log,
-                            save_for_dump = False)
+        logger = Logger(title = "%s port %d" % (prefix, port),
+                        print_to_console = args.log,
+                        save_for_dump = False)
+
+        server = TcpEchoServer(prefix, port, args.echo, args.timeout, logger)
+
+        keep_running = True
+        while keep_running:
+            time.sleep(0.1)
+            if server.error is not None:
+                logger.log("%s Server stopped with error: %s" % (prefix, server.error))
+                keep_running = False
+                retval = 1
+            if server.exit_status is not None:
+                logger.log("%s Server stopped with status: %s" % (prefix, server.exit_status))
+                keep_running = False
+            if killer.kill_now:
+                logger.log("%s Process killed with signal" % (prefix))
+                keep_running = False
+            if keep_running and not server.is_running:
+                logger.log("%s Server stopped with no error or status" % (prefix))
+                keep_running = False
 
-        # the listening socket
-        lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
-        main_except(lsock, port, args.echo, args.timeout, logger)
-
-    except KeyboardInterrupt:
-        logger.log("Exiting due to KeyboardInterrupt. Total echoed = %d" % total_echoed)
 
     except Exception as e:
-        logger.log("Exiting due to Exception. Total echoed = %d" % total_echoed)
-        traceback.print_exc()
+        logger.log("%s Exception: %s" % (prefix, traceback.format_exc()))
         retval = 1
 
-    if lsock is not None:
-        lsock.close()
+    if server is not None and server.sock is not None:
+        server.sock.close()
+
     return retval
 
 
diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py
index 8120937..c2df88d 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -23,38 +23,24 @@ from __future__ import absolute_import
 from __future__ import print_function
 
 import os
+import traceback
 from time import sleep
 from threading import Event
 from threading import Timer
 
-from proton import Message, Timeout
-from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy, TestTimeout, PollTimeout
-from system_test import AsyncTestReceiver
-from system_test import AsyncTestSender
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT
 from system_test import Logger
 from system_test import QdManager
 from system_test import unittest
 from system_test import Process
-from system_tests_link_routes import ConnLinkRouteService
-from test_broker import FakeBroker
-from test_broker import FakeService
-from proton.handlers import MessagingHandler
-from proton.reactor import Container, DynamicNodeProperties
-from proton.utils import BlockingConnection
+from system_test import DIR
 from qpid_dispatch.management.client import Node
-from qpid_dispatch_internal.tools.command import version_supports_mutually_exclusive_arguments
 from subprocess import PIPE, STDOUT
-import re
+from TCP_echo_client import TcpEchoClient
+from TCP_echo_server import TcpEchoServer
 
-class AddrTimer(object):
-    def __init__(self, parent):
-        self.parent = parent
 
-    def on_timer_task(self, event):
-        self.parent.check_address()
-
-
-class TcpAdaptorOneRouterEcho(TestCase):
+class TcpAdaptorOneRouterEcho(TestCase, Process):
     """
     Run echo tests through a stand-alone router
     """
@@ -64,7 +50,7 @@ class TcpAdaptorOneRouterEcho(TestCase):
 
     @classmethod
     def setUpClass(cls):
-        """Start a router and echo server"""
+        """Start a router"""
         super(TcpAdaptorOneRouterEcho, cls).setUpClass()
 
         def router(name, mode, l_amqp, l_tcp_client, l_tcp_server, addr, site, extra=None):
@@ -92,54 +78,87 @@ class TcpAdaptorOneRouterEcho(TestCase):
         cls.tcp_client_listener_port = cls.tester.get_port()
         cls.tcp_server_listener_port = cls.tester.get_port()
 
-        router('INT.A', 'interior', cls.amqp_listener_port, cls.tcp_client_listener_port,
+        router('A', 'interior', cls.amqp_listener_port, cls.tcp_client_listener_port,
                cls.tcp_server_listener_port, "some_address", "best_site")
 
-        cls.logger = Logger(title="TCP echo one router", print_to_console=True)
-
-    @classmethod
-    def tearDownClass(cls):
-        pass
-
-    def spawn_echo_server(self, port, expect=None):
-        cmd = ["TCP_echo_server.py",
-               "--port", str(port),
-               "--log"]
-        return self.popen(cmd, name='echo-server', stdout=PIPE, expect=expect,
-                          universal_newlines=True)
-
-    def spawn_echo_client(self, logger, host, port, size, count, expect=None):
-        if expect is None:
-            expect = Process.EXIT_OK
-        cmd = ["TCP_echo_client.py",
-               "--host", host,
-               "--port", str(port),
-               "--size", str(size),
-               "--count", str(count),
-               "--log"]
-        logger.log("Start client. cmd=%s" % str(cmd))
-        return self.popen(cmd, name='echo-clint', stdout=PIPE, expect=expect,
-                          universal_newlines=True)
-
-    def do_test_echo(self, logger, host, port, size, count):
-        # start echo client
-        client = self.spawn_echo_client(logger, host, port, size, count)
-        cl_text, cl_error = client.communicate(timeout=TIMEOUT)
-        if client.returncode:
-            raise Exception("Echo client failed size:%d, count:%d : %s %s" %
-                            (size, count, cl_text, cl_error))
+        cls.logger = Logger(title="TcpAdaptorOneRouterEcho-testClass", print_to_console=True)
+
+    def do_test_echo(self, test_name, logger, host, port, size, count):
+        # Run echo client. Return true if it works.
+        name = "%s_%s_%s_%s" % (test_name, port, size, count)
+        client_prefix = "ECHO_CLIENT %s" % name
+        client_logger = Logger(title=client_prefix, print_to_console=False, save_for_dump=True)
+        result = True # assume it works
+        try:
+            # start client
+            client = TcpEchoClient(prefix=client_prefix,
+                                   host=host,
+                                   port=port,
+                                   size=size,
+                                   count=count,
+                                   timeout=TIMEOUT,
+                                   logger=client_logger)
+            #assert client.is_running
+
+            # wait for client to finish
+            keep_waiting = True
+            while keep_waiting:
+                sleep(0.1)
+                if client.error is not None:
+                    logger.log("%s Client stopped with error: %s" % (name, client.error))
+                    keep_waiting = False
+                    result = False
+                if client.exit_status is not None:
+                    logger.log("%s Client stopped with status: %s" % (name, client.exit_status))
+                    keep_waiting = False
+                    result = False
+                if keep_waiting and not client.is_running:
+                    logger.log("%s Client stopped with no error or status" % (name))
+                    keep_waiting = False
+
+        except Exception as exc:
+            logger.log("EchoClient %s failed. Exception: %s" %
+                       (name, traceback.format_exc()))
+            result = False
+
+        if not result:
+            # On failure, dump the client log through the test log. Compound logs here we go
+            for line in client_logger.logs:
+                logger.log("Failed client log: %s" % line)
+        return result
 
     def test_01_tcp_echo_one_router(self):
+        """
+        Run one echo server.
+        Run many echo clients.
+        :return:
+        """
         # start echo server
-        #server = self.spawn_echo_server(self.tcp_server_listener_port)
-
-        #for size in [1, 5, 10, 50, 100]:
-        #    for count in [1, 5, 10, 50, 100]:
-        #        self.logger.log("Starting echo client host:localhost, port:%d, size:%d, count:%d" %
-        #                   (self.tcp_client_listener_port, size, count))
-        #        self.do_test_echo(self.logger, "localhost", self.tcp_client_listener_port, size, count)
-        #server.join()
-        pass
+        test_name = "test_01_tcp_echo_one_router"
+        server_prefix = "ECHO_SERVER %s" % test_name
+        server_logger = Logger(title=test_name, print_to_console=False, save_for_dump=True)
+        server = TcpEchoServer(prefix=server_prefix,
+                               port=self.tcp_server_listener_port,
+                               timeout=TIMEOUT,
+                               logger=server_logger)
+        assert server.is_running
+
+        # run series of clients to test
+        result = True
+        for size in [1]:
+            for count in [1]:
+                test_info = "Starting echo client %s host:localhost, port:%d, size:%d, count:%d" % \
+                           (test_name, self.tcp_client_listener_port, size, count)
+                self.logger.log(test_info)
+                result = self.do_test_echo(test_name, self.logger, "localhost",
+                                           self.tcp_client_listener_port, size, count)
+                if not result:
+                    break
+            if not result:
+                break
+        # stop echo server
+        server.wait()
+        assert result, "Test case failed %s" % test_info
 
 if __name__== '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org