You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2020/10/30 13:42:10 UTC
[qpid-dispatch] branch dev-protocol-adaptors-2 updated:
DISPATCH-1807: Add self tests for tcp protocol adaptor
This is an automated email from the ASF dual-hosted git repository.
chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
new e29ea39 DISPATCH-1807: Add self tests for tcp protocol adaptor
e29ea39 is described below
commit e29ea396acfab940e54db36b569a865720ac9f8a
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Fri Oct 30 09:40:35 2020 -0400
DISPATCH-1807: Add self tests for tcp protocol adaptor
* echo_server and echo_client modified to have a test class
that runs the test proper.
* tcp_adaptor test runs the client and server test classes in
separate threads and not in separate processes.
This closes #905
---
tests/TCP_echo_client.py | 422 +++++++++++++++++++++-----------------
tests/TCP_echo_server.py | 357 +++++++++++++++++++-------------
tests/system_tests_tcp_adaptor.py | 147 +++++++------
3 files changed, 532 insertions(+), 394 deletions(-)
diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py
index 5f2a687..57152e3 100755
--- a/tests/TCP_echo_client.py
+++ b/tests/TCP_echo_client.py
@@ -19,25 +19,33 @@
# under the License.
#
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
import argparse
import os
import selectors
+import signal
import socket
import sys
+from threading import Thread
import time
import traceback
import types
from system_test import Logger
+from system_test import TIMEOUT
+class GracefulKiller:
+ kill_now = False
+ def __init__(self):
+ signal.signal(signal.SIGINT, self.exit_gracefully)
+ signal.signal(signal.SIGTERM, self.exit_gracefully)
-class EchoLogger(Logger):
- def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False):
- self.prefix = prefix + ' ' if len(prefix) > 0 else ''
- super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump)
-
- def log(self, msg):
- super(EchoLogger, self).log(self.prefix + msg)
+ def exit_gracefully(self,signum, frame):
+ self.kill_now = True
def split_chunk_for_display(raw_bytes):
@@ -55,192 +63,240 @@ def split_chunk_for_display(raw_bytes):
return result
-def main_except(host, port, size, count, timeout, logger):
- '''
- :param host: connect to this host
- :param port: connect to this port
- :param size: size of individual payload chunks in bytes
- :param count: number of payload chunks
- :param strategy: "1" Send one payload; # TODO
- Recv one payload
- :param logger: Logger() object
- :return:
- '''
- # Start up
- start_time = time.time()
- logger.log('Connecting to host:%s, port:%d, size:%d, count:%d' % (host, port, size, count))
- keep_going = True
- total_sent = 0
- total_rcvd = 0
-
- # outbound payload
- payload_out = []
- out_list_idx = 0 # current _out array being sent
- out_byte_idx = 0 # next-to-send in current array
- out_ready_to_send = True
- # Generate unique content for each message so you can tell where the message
- # or fragment belongs in the whole stream. Chunks look like:
- # b'[localhost:33333:6:0]ggggggggggggggggggggggggggggg'
- # host: localhost
- # port: 33333
- # index: 6
- # offset into message: 0
- MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo server, too
- for idx in range(count):
- body_msg = ""
- padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30]
- while len(body_msg) < size:
- chunk = "[%s:%d:%d:%d]" % (host, port, idx, len(body_msg))
- padlen = MAGIC_SIZE - len(chunk)
- chunk += padchar * padlen
- body_msg += chunk
- if len(body_msg) > size:
- body_msg = body_msg[:size]
- payload_out.append(bytearray(body_msg.encode()))
- # incoming payloads
- payload_in = []
- in_list_idx = 0 # current _in array being received
- for i in range(count):
- payload_in.append(bytearray())
-
- # set up connection
- host_address = (host, port)
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(host_address)
- sock.setblocking(False)
-
- # set up selector
- sel = selectors.DefaultSelector()
- sel.register(sock,
- selectors.EVENT_READ | selectors.EVENT_WRITE)
-
- # event loop
- while keep_going:
- if timeout > 0.0:
- elapsed = time.time() - start_time
- if elapsed > timeout:
- logger.log("Exiting due to timeout. Total sent= %d, total rcvd= %d" % (total_sent, total_rcvd))
- break
- for key, mask in sel.select(timeout=0.1):
- sock = key.fileobj
- if mask & selectors.EVENT_READ:
- recv_data = sock.recv(1024)
- if recv_data:
- total_rcvd = len(recv_data)
- payload_in[in_list_idx].extend(recv_data)
- if len(payload_in[in_list_idx]) == size:
- logger.log("Rcvd message %d" % in_list_idx)
- in_list_idx += 1
- if in_list_idx == count:
- # Received all bytes of all chunks - done.
- keep_going = False
- # Verify the received data
- for idxc in range(count):
- for idxs in range(size):
- ob = payload_out[idxc][idxs]
- ib = payload_in [idxc][idxs]
- if ob != ib:
- error = "CRITICAL Rcvd message verify fail. row:%d, col:%d, expected:%s, actual:%s" \
- % (idxc, idxs, repr(ob), repr(ib))
- logger.log(error)
- raise Exception(error)
+class TcpEchoClient():
+
+ def __init__(self, prefix, host, port, size, count, timeout, logger):
+ '''
+ :param host: connect to this host
+ :param port: connect to this port
+ :param size: size of individual payload chunks in bytes
+ :param count: number of payload chunks
+ :param strategy: "1" Send one payload; # TODO
+ Recv one payload
+ :param logger: Logger() object
+ :return:
+ '''
+ # Start up
+ self.sock = None
+ self.prefix = prefix
+ self.host = host
+ self.port = port
+ self.size = size
+ self.count = count
+ self.timeout = timeout
+ self.logger = logger
+ self.keep_running = True
+ self.is_running = False
+ self.exit_status = None
+ self.error = None
+ self._thread = Thread(target=self.run)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def run(self):
+ try:
+ start_time = time.time()
+ self.is_running = True
+ self.logger.log('%s Connecting to host:%s, port:%d, size:%d, count:%d' %
+ (self.prefix, self.host, self.port, self.size, self.count))
+ total_sent = 0
+ total_rcvd = 0
+
+ # outbound payload
+ payload_out = []
+ out_list_idx = 0 # current _out array being sent
+ out_byte_idx = 0 # next-to-send in current array
+ out_ready_to_send = True
+ # Generate unique content for each message so you can tell where the message
+ # or fragment belongs in the whole stream. Chunks look like:
+ # b'[localhost:33333:6:0]ggggggggggggggggggggggggggggg'
+ # host: localhost
+ # port: 33333
+ # index: 6
+ # offset into message: 0
+ CONTENT_CHUNK_SIZE = 50 # Content repeats after chunks this big - used by echo server, too
+ for idx in range(self.count):
+ body_msg = ""
+ padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30]
+ while len(body_msg) < self.size:
+ chunk = "[%s:%d:%d:%d]" % (self.host, self.port, idx, len(body_msg))
+ padlen = CONTENT_CHUNK_SIZE - len(chunk)
+ chunk += padchar * padlen
+ body_msg += chunk
+ if len(body_msg) > self.size:
+ body_msg = body_msg[:self.size]
+ payload_out.append(bytearray(body_msg.encode()))
+ # incoming payloads
+ payload_in = []
+ in_list_idx = 0 # current _in array being received
+ for i in range(self.count):
+ payload_in.append(bytearray())
+
+ # set up connection
+ host_address = (self.host, self.port)
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.connect(host_address)
+ self.sock.setblocking(False)
+
+ # set up selector
+ sel = selectors.DefaultSelector()
+ sel.register(self.sock,
+ selectors.EVENT_READ | selectors.EVENT_WRITE)
+
+ # event loop
+ while self.keep_running:
+ if self.timeout > 0.0:
+ elapsed = time.time() - start_time
+ if elapsed > self.timeout:
+ self.exit_status = "%s Exiting due to timeout. Total sent= %d, total rcvd= %d" % \
+ (self.prefix, total_sent, total_rcvd)
+ break
+ for key, mask in sel.select(timeout=0.1):
+ sock = key.fileobj
+ if mask & selectors.EVENT_READ:
+ recv_data = sock.recv(1024)
+ if recv_data:
+ total_rcvd = len(recv_data)
+ payload_in[in_list_idx].extend(recv_data)
+ if len(payload_in[in_list_idx]) == self.size:
+ self.logger.log("%s Rcvd message %d" % (self.prefix, in_list_idx))
+ in_list_idx += 1
+ if in_list_idx == self.count:
+ # Received all bytes of all chunks - done.
+ self.keep_running = False
+ # Verify the received data
+ for idxc in range(self.count):
+ for idxs in range(self.size):
+ ob = payload_out[idxc][idxs]
+ ib = payload_in [idxc][idxs]
+ if ob != ib:
+ self.error = "%s ERROR Rcvd message verify fail. row:%d, col:%d, expected:%s, actual:%s" \
+ % (self.prefix, idxc, idxs, repr(ob), repr(ib))
+ break
+ else:
+ out_ready_to_send = True
+ sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE)
+ elif len(payload_in[in_list_idx]) > self.size:
+ self.error = "ERROR Received message too big. Expected:%d, actual:%d" % \
+ (self.size, len(payload_in[in_list_idx]))
+ break
+ else:
+ pass # still accumulating a message
+ else:
+ # socket closed
+ self.keep_running = False
+ if self.keep_running and mask & selectors.EVENT_WRITE:
+ if out_ready_to_send:
+ n_sent = self.sock.send( payload_out[out_list_idx][out_byte_idx:] )
+ total_sent += n_sent
+ out_byte_idx += n_sent
+ if out_byte_idx == self.size:
+ self.logger.log("%s Sent message %d" % (self.prefix, out_list_idx))
+ out_byte_idx = 0
+ out_list_idx += 1
+ sel.modify(self.sock, selectors.EVENT_READ) # turn off write events
+ out_ready_to_send = False # turn on when rcvr receives
else:
- out_ready_to_send = True
- sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE)
- elif len(payload_in[in_list_idx]) > size:
- error = "CRITICAL Rcvd message too big. Expected:%d, actual:%d" % \
- (size, len(payload_in[in_list_idx]))
- logger.log(error)
- raise Exception(error)
- else:
- pass # still accumulating a message
- else:
- # socket closed
- keep_going = False
- if mask & selectors.EVENT_WRITE:
- if out_ready_to_send:
- n_sent = sock.send( payload_out[out_list_idx][out_byte_idx:] )
- total_sent += n_sent
- out_byte_idx += n_sent
- if out_byte_idx == size:
- logger.log("Sent message %d" % out_list_idx)
- out_byte_idx = 0
- out_list_idx += 1
- sel.modify(sock, selectors.EVENT_READ) # turn off write events
- out_ready_to_send = False # turn on when rcvr receives
- else:
- logger.log("DEBUG: ignoring EVENT_WRITE")
-
- # shut down
- sel.unregister(sock)
- sock.close()
+ pass # logger.log("DEBUG: ignoring EVENT_WRITE")
+
+ # shut down
+ sel.unregister(self.sock)
+ self.sock.close()
+
+ except Exception as exc:
+ self.error = "ERROR: exception : '%s'" % traceback.format_exc()
+
+ self.is_running = False
+
+ def wait(self, timeout=TIMEOUT):
+ self.logger.log("%s Client is shutting down" % (self.prefix))
+ self.keep_running = False
+ self._thread.join(timeout)
+
def main(argv):
- try:
- # parse args
- p = argparse.ArgumentParser()
- p.add_argument('--host', '-b',
- help='Required target host')
- p.add_argument('--port', '-p', type=int,
- help='Required target port number')
- p.add_argument('--size', '-s', type=int, default=100, const=1, nargs='?',
- help='Size of payload in bytes')
- p.add_argument('--count', '-c', type=int, default=1, const=1, nargs='?',
- help='Number of payloads to process')
- p.add_argument('--name',
- help='Optional logger prefix')
- p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?",
- help='Timeout in seconds. Default value "0" disables timeouts')
- p.add_argument('--log', '-l',
- action='store_true',
- help='Write activity log to console')
- del argv[0]
- args = p.parse_args(argv)
-
- # host
- if args.host is None:
- raise Exception("User must specify a host")
- host = args.host
-
- # port
- if args.port is None:
- raise Exception("User must specify a port number")
- port = args.port
-
- # size
- if args.size <= 0:
- raise Exception("Size must be greater than zero")
- size = args.size
-
- # count
- if args.count <= 0:
- raise Exception("Count must be greater than zero")
- count = args.count
-
- # name / prefix
- prefix = args.name if args.name is not None else "ECHO_CLIENT"
-
- # timeout
- if args.timeout < 0.0:
- raise Exception("Timeout must be greater than or equal to zero")
+ retval = 0
+ # parse args
+ p = argparse.ArgumentParser()
+ p.add_argument('--host', '-b',
+ help='Required target host')
+ p.add_argument('--port', '-p', type=int,
+ help='Required target port number')
+ p.add_argument('--size', '-s', type=int, default=100, const=1, nargs='?',
+ help='Size of payload in bytes')
+ p.add_argument('--count', '-c', type=int, default=1, const=1, nargs='?',
+ help='Number of payloads to process')
+ p.add_argument('--name',
+ help='Optional logger prefix')
+ p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?",
+ help='Timeout in seconds. Default value "0" disables timeouts')
+ p.add_argument('--log', '-l',
+ action='store_true',
+ help='Write activity log to console')
+ del argv[0]
+ args = p.parse_args(argv)
+
+ # host
+ if args.host is None:
+ raise Exception("User must specify a host")
+ host = args.host
+
+ # port
+ if args.port is None:
+ raise Exception("User must specify a port number")
+ port = args.port
+
+ # size
+ if args.size <= 0:
+ raise Exception("Size must be greater than zero")
+ size = args.size
+ # count
+ if args.count <= 0:
+ raise Exception("Count must be greater than zero")
+ count = args.count
+
+ # name / prefix
+ prefix = args.name if args.name is not None else "ECHO_CLIENT (%d_%d_%d)" % \
+ (port, size, count)
+
+ # timeout
+ if args.timeout < 0.0:
+ raise Exception("Timeout must be greater than or equal to zero")
+
+ killer = GracefulKiller()
+ client = None
+
+ try:
# logging
- logger = EchoLogger(prefix=prefix,
- title = "%s host:%s port %d size:%d count:%d" % (prefix, host, port, size, count),
- print_to_console = args.log,
- save_for_dump = False)
+ logger = Logger(title = "%s host:%s port %d size:%d count:%d" % (prefix, host, port, size, count),
+ print_to_console = args.log,
+ save_for_dump = False)
- main_except(host, port, size, count, args.timeout, logger)
- return 0
+ client = TcpEchoClient(prefix, host, port, size, count, args.timeout, logger)
- except KeyboardInterrupt:
- logger.log("Exiting due to KeyboardInterrupt")
- return 0
+ keep_running = True
+ while keep_running:
+ time.sleep(0.1)
+ if client.error is not None:
+ logger.log("%s Client stopped with error: %s" % (prefix, client.error))
+ keep_running = False
+ retval = 1
+ if client.exit_status is not None:
+ logger.log("%s Client stopped with status: %s" % (prefix, client.exit_status))
+ keep_running = False
+ if killer.kill_now:
+ logger.log("%s Process killed with signal" % (prefix))
+ keep_running = False
+ if keep_running and not client.is_running:
+ logger.log("%s Client stopped with no error or status" % (prefix))
+ keep_running = False
except Exception as e:
- traceback.print_exc()
- return 1
+ logger.log("%s Exception: %s" % (prefix, traceback.format_exc()))
+ retval = 1
+ return retval
if __name__ == "__main__":
sys.exit(main(sys.argv))
diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py
index 28fc2ae..34f72bb 100755
--- a/tests/TCP_echo_server.py
+++ b/tests/TCP_echo_server.py
@@ -19,19 +19,24 @@
# under the License.
#
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
import argparse
import os
import selectors
-from signal import signal, SIGINT
+import signal
import socket
import sys
+from threading import Thread
import time
import traceback
import types
from system_test import Logger
-
-HOST = '127.0.0.1'
+from system_test import TIMEOUT
class ClientRecord(object):
"""
@@ -54,176 +59,234 @@ class ClientRecord(object):
return self.__repr__()
-class EchoLogger(Logger):
- def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False):
- self.prefix = prefix + ' ' if len(prefix) > 0 else ''
- super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump)
-
- def log(self, msg):
- super(EchoLogger, self).log(self.prefix + msg)
+class GracefulKiller:
+ kill_now = False
+ def __init__(self):
+ signal.signal(signal.SIGINT, self.exit_gracefully)
+ signal.signal(signal.SIGTERM, self.exit_gracefully)
+
+ def exit_gracefully(self,signum, frame):
+ self.kill_now = True
def split_chunk_for_display(raw_bytes):
"""
Given some raw bytes, return a display string
- Only show the beginning and end of largish (2xMAGIC_SIZE) arrays.
+ Only show the beginning and end of largish (2x CONTENT_CHUNK_SIZE) arrays.
:param raw_bytes:
:return: display string
"""
- MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo client, too
- if len(raw_bytes) > 2 * MAGIC_SIZE:
- result = repr(raw_bytes[:MAGIC_SIZE]) + " ... " + repr(raw_bytes[-MAGIC_SIZE:])
+ CONTENT_CHUNK_SIZE = 50 # Content repeats after chunks this big - used by echo client, too
+ if len(raw_bytes) > 2 * CONTENT_CHUNK_SIZE:
+ result = repr(raw_bytes[:CONTENT_CHUNK_SIZE]) + " ... " + repr(raw_bytes[-CONTENT_CHUNK_SIZE:])
else:
result = repr(raw_bytes)
return result
-def main_except(sock, port, echo_count, timeout, logger):
- '''
- :param lsock: socket to listen on
- :param port: port to listen on
- :param echo_count: exit after echoing this many bytes
- :param timeout: exit after this many seconds
- :param logger: Logger() object
- :return:
- '''
- # set up spontaneous exit settings
- start_time = time.time()
- total_echoed = 0
-
- # set up listening socket
- sock.bind((HOST, port))
- sock.listen()
- sock.setblocking(False)
- logger.log('Listening on host:%s, port:%d' % (HOST, port))
-
- # set up selector
- sel = selectors.DefaultSelector()
- sel.register(sock, selectors.EVENT_READ, data=None)
-
- # event loop
- while True:
- if timeout > 0.0:
- elapsed = time.time() - start_time
- if elapsed > timeout:
- logger.log("Exiting due to timeout. Total echoed = %d" % total_echoed)
- break
- if echo_count > 0:
- if total_echoed >= echo_count:
- logger.log("Exiting due to echo byte count. Total echoed = %d" % total_echoed)
- break
- events = sel.select(timeout=0.1)
- if events:
- for key, mask in events:
- if key.data is None:
- if key.fileobj is sock:
- do_accept(key.fileobj, sel, logger)
- else:
- raise Exception("Only listener 'sock' has None in opaque data field")
+class TcpEchoServer():
+ def __init__(self, prefix="ECHO_SERVER", port="0", echo_count=0, timeout=0.0, logger=None):
+ '''
+ Start echo server in separate thread
+
+ :param prefix: log prefix
+ :param port: port to listen on
+ :param echo_count: exit after echoing this many bytes
+ :param timeout: exit after this many seconds
+ :param logger: Logger() object
+ :return:
+ '''
+ self.sock = None
+ self.prefix = prefix
+ self.port = port
+ self.echo_count = echo_count
+ self.timeout = timeout
+ self.logger = logger
+ self.keep_running = True
+ self.HOST = '127.0.0.1'
+ self.is_running = False
+ self.exit_status = None
+ self.error = None
+ self._thread = Thread(target=self.run)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def run(self):
+ """
+ Run server in daemon thread
+ :return:
+ """
+ try:
+ # set up spontaneous exit settings
+ self.is_running = True
+ start_time = time.time()
+ total_echoed = 0
+
+ # set up listening socket
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.bind((self.HOST, self.port))
+ self.sock.listen()
+ self.sock.setblocking(False)
+ self.logger.log('%s Listening on host:%s, port:%d' % (self.prefix, self.HOST, self.port))
+
+ # set up selector
+ sel = selectors.DefaultSelector()
+ sel.register(self.sock, selectors.EVENT_READ, data=None)
+
+ # event loop
+ while True:
+ if not self.keep_running:
+ self.exit_status = "INFO: command shutdown:"
+ break
+ if self.timeout > 0.0:
+ elapsed = time.time() - start_time
+ if elapsed > self.timeout:
+ self.exit_status = "Exiting due to timeout. Total echoed = %d" % total_echoed
+ break
+ if self.echo_count > 0:
+ if total_echoed >= self.echo_count:
+ self.exit_status = "Exiting due to echo byte count. Total echoed = %d" % total_echoed
+ break
+ events = sel.select(timeout=0.1)
+ if events:
+ for key, mask in events:
+ if key.data is None:
+ if key.fileobj is self.sock:
+ self.do_accept(key.fileobj, sel, self.logger)
+ else:
+ pass # Only listener 'sock' has None in opaque data field
+ else:
+ total_echoed += self.do_service(key, mask, sel, self.logger)
+ else:
+ pass # select timeout. probably.
+
+ sel.unregister(self.sock)
+ self.sock.close()
+
+ except Exception as exc:
+ self.error = "ERROR: exception : '%s'" % traceback.format_exc()
+
+ self.is_running = False
+
+ def do_accept(self, sock, sel, logger):
+ conn, addr = sock.accept()
+ logger.log('%s Accepted connection from %s:%d' % (self.prefix, addr[0], addr[1]))
+ conn.setblocking(False)
+ events = selectors.EVENT_READ | selectors.EVENT_WRITE
+ sel.register(conn, events, data=ClientRecord(addr))
+
+ def do_service(self, key, mask, sel, logger):
+ retval = 0
+ sock = key.fileobj
+ data = key.data
+ if mask & selectors.EVENT_READ:
+ try:
+ recv_data = sock.recv(1024)
+ except ConnectionResetError as exc:
+ logger.log('%s Connection to %s:%d closed by peer' % (self.prefix, data.addr[0], data.addr[1]))
+ sel.unregister(sock)
+ sock.close()
+ return 0
+ if recv_data:
+ data.outb += recv_data
+ logger.log('%s read from: %s:%d len:%d: %s' % (self.prefix, data.addr[0], data.addr[1], len(recv_data),
+ split_chunk_for_display(recv_data)))
+ sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)
+ else:
+ logger.log('%s Closing connection to %s:%d' % (self.prefix, data.addr[0], data.addr[1]))
+ sel.unregister(sock)
+ sock.close()
+ return 0
+ if mask & selectors.EVENT_WRITE:
+ if data.outb:
+ sent = sock.send(data.outb)
+ retval += sent
+ if sent > 0:
+ logger.log('%s write to : %s:%d len:%d: %s' % (self.prefix, data.addr[0], data.addr[1], sent,
+ split_chunk_for_display(data.outb[:sent])))
else:
- total_echoed += do_service(key, mask, sel, logger)
- else:
- pass # select timeout. probably.
+ logger.log('%s write to : %s:%d len:0' % (self.prefix, data.addr[0], data.addr[1]))
+ data.outb = data.outb[sent:]
+ else:
+ sel.modify(sock, selectors.EVENT_READ, data=data)
+ return retval
- sel.unregister(sock)
- sock.close()
+ def wait(self, timeout=TIMEOUT):
+ self.logger.log("%s Server is shutting down" % (self.prefix))
+ self.keep_running = False
+ self._thread.join(timeout)
-def do_accept(sock, sel, logger):
- conn, addr = sock.accept()
- logger.log('Accepted connection from %s:%d' % (addr[0], addr[1]))
- conn.setblocking(False)
- events = selectors.EVENT_READ | selectors.EVENT_WRITE
- sel.register(conn, events, data=ClientRecord(addr))
-
-def do_service(key, mask, sel, logger):
- retval = 0
- sock = key.fileobj
- data = key.data
- if mask & selectors.EVENT_READ:
- recv_data = sock.recv(1024)
- if recv_data:
- data.outb += recv_data
- logger.log('read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data),
- split_chunk_for_display(recv_data)))
- sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)
- else:
- logger.log('Closing connection to %s:%d' % (data.addr[0], data.addr[1]))
- sel.unregister(sock)
- sock.close()
- return 0
- if mask & selectors.EVENT_WRITE:
- if data.outb:
- sent = sock.send(data.outb)
- retval += sent
- if sent > 0:
- logger.log('write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent,
- split_chunk_for_display(data.outb[:sent])))
- else:
- logger.log('write to : %s:%d len:0' % (data.addr[0], data.addr[1]))
- data.outb = data.outb[sent:]
- else:
- #logger.log('write event with no data' + str(data))
- sel.modify(sock, selectors.EVENT_READ, data=data)
- return retval
def main(argv):
retval = 0
- try:
- # parse args
- p = argparse.ArgumentParser()
- p.add_argument('--port', '-p',
- help='Required listening port number')
- p.add_argument('--name',
- help='Optional logger prefix')
- p.add_argument('--echo', '-e', type=int, default=0, const=1, nargs="?",
- help='Exit after echoing this many bytes. Default value "0" disables exiting on byte count.')
- p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?",
- help='Timeout in seconds. Default value "0" disables timeouts')
- p.add_argument('--log', '-l',
- action='store_true',
- help='Write activity log to console')
- del argv[0]
- args = p.parse_args(argv)
-
- lsock = None
-
- # port
- if args.port is None:
- raise Exception("User must specify a port number")
- port = int(args.port)
-
- # name / prefix
- prefix = args.name if args.name is not None else "ECHO_SERVER"
-
- # echo
- if args.echo < 0:
- raise Exception("Echo count must be greater than zero")
-
- # timeout
- if args.timeout < 0.0:
- raise Exception("Timeout must be greater than or equal to zero")
+ # parse args
+ p = argparse.ArgumentParser()
+ p.add_argument('--port', '-p',
+ help='Required listening port number')
+ p.add_argument('--name',
+ help='Optional logger prefix')
+ p.add_argument('--echo', '-e', type=int, default=0, const=1, nargs="?",
+ help='Exit after echoing this many bytes. Default value "0" disables exiting on byte count.')
+ p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?",
+ help='Timeout in seconds. Default value "0" disables timeouts')
+ p.add_argument('--log', '-l',
+ action='store_true',
+ help='Write activity log to console')
+ del argv[0]
+ args = p.parse_args(argv)
+
+ # port
+ if args.port is None:
+ raise Exception("User must specify a port number")
+ port = int(args.port)
+
+ # name / prefix
+ prefix = args.name if args.name is not None else "ECHO_SERVER (%s)" % (str(port))
+
+ # echo
+ if args.echo < 0:
+ raise Exception("Echo count must be greater than zero")
+
+ # timeout
+ if args.timeout < 0.0:
+ raise Exception("Timeout must be greater than or equal to zero")
+
+ killer = GracefulKiller()
+ server = None
+ try:
# logging
- logger = EchoLogger(prefix = prefix,
- title = "%s port %d" % (prefix, port),
- print_to_console = args.log,
- save_for_dump = False)
+ logger = Logger(title = "%s port %d" % (prefix, port),
+ print_to_console = args.log,
+ save_for_dump = False)
+
+ server = TcpEchoServer(prefix, port, args.echo, args.timeout, logger)
+
+ keep_running = True
+ while keep_running:
+ time.sleep(0.1)
+ if server.error is not None:
+ logger.log("%s Server stopped with error: %s" % (prefix, server.error))
+ keep_running = False
+ retval = 1
+ if server.exit_status is not None:
+ logger.log("%s Server stopped with status: %s" % (prefix, server.exit_status))
+ keep_running = False
+ if killer.kill_now:
+ logger.log("%s Process killed with signal" % (prefix))
+ keep_running = False
+ if keep_running and not server.is_running:
+ logger.log("%s Server stopped with no error or status" % (prefix))
+ keep_running = False
- # the listening socket
- lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
- main_except(lsock, port, args.echo, args.timeout, logger)
-
- except KeyboardInterrupt:
- logger.log("Exiting due to KeyboardInterrupt. Total echoed = %d" % total_echoed)
except Exception as e:
- logger.log("Exiting due to Exception. Total echoed = %d" % total_echoed)
- traceback.print_exc()
+ logger.log("%s Exception: %s" % (prefix, traceback.format_exc()))
retval = 1
- if lsock is not None:
- lsock.close()
+ if server is not None and server.sock is not None:
+ server.sock.close()
+
return retval
diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py
index 8120937..c2df88d 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -23,38 +23,24 @@ from __future__ import absolute_import
from __future__ import print_function
import os
+import traceback
from time import sleep
from threading import Event
from threading import Timer
-from proton import Message, Timeout
-from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy, TestTimeout, PollTimeout
-from system_test import AsyncTestReceiver
-from system_test import AsyncTestSender
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT
from system_test import Logger
from system_test import QdManager
from system_test import unittest
from system_test import Process
-from system_tests_link_routes import ConnLinkRouteService
-from test_broker import FakeBroker
-from test_broker import FakeService
-from proton.handlers import MessagingHandler
-from proton.reactor import Container, DynamicNodeProperties
-from proton.utils import BlockingConnection
+from system_test import DIR
from qpid_dispatch.management.client import Node
-from qpid_dispatch_internal.tools.command import version_supports_mutually_exclusive_arguments
from subprocess import PIPE, STDOUT
-import re
+from TCP_echo_client import TcpEchoClient
+from TCP_echo_server import TcpEchoServer
-class AddrTimer(object):
- def __init__(self, parent):
- self.parent = parent
- def on_timer_task(self, event):
- self.parent.check_address()
-
-
-class TcpAdaptorOneRouterEcho(TestCase):
+class TcpAdaptorOneRouterEcho(TestCase, Process):
"""
Run echo tests through a stand-alone router
"""
@@ -64,7 +50,7 @@ class TcpAdaptorOneRouterEcho(TestCase):
@classmethod
def setUpClass(cls):
- """Start a router and echo server"""
+ """Start a router"""
super(TcpAdaptorOneRouterEcho, cls).setUpClass()
def router(name, mode, l_amqp, l_tcp_client, l_tcp_server, addr, site, extra=None):
@@ -92,54 +78,87 @@ class TcpAdaptorOneRouterEcho(TestCase):
cls.tcp_client_listener_port = cls.tester.get_port()
cls.tcp_server_listener_port = cls.tester.get_port()
- router('INT.A', 'interior', cls.amqp_listener_port, cls.tcp_client_listener_port,
+ router('A', 'interior', cls.amqp_listener_port, cls.tcp_client_listener_port,
cls.tcp_server_listener_port, "some_address", "best_site")
- cls.logger = Logger(title="TCP echo one router", print_to_console=True)
-
- @classmethod
- def tearDownClass(cls):
- pass
-
- def spawn_echo_server(self, port, expect=None):
- cmd = ["TCP_echo_server.py",
- "--port", str(port),
- "--log"]
- return self.popen(cmd, name='echo-server', stdout=PIPE, expect=expect,
- universal_newlines=True)
-
- def spawn_echo_client(self, logger, host, port, size, count, expect=None):
- if expect is None:
- expect = Process.EXIT_OK
- cmd = ["TCP_echo_client.py",
- "--host", host,
- "--port", str(port),
- "--size", str(size),
- "--count", str(count),
- "--log"]
- logger.log("Start client. cmd=%s" % str(cmd))
- return self.popen(cmd, name='echo-clint', stdout=PIPE, expect=expect,
- universal_newlines=True)
-
- def do_test_echo(self, logger, host, port, size, count):
- # start echo client
- client = self.spawn_echo_client(logger, host, port, size, count)
- cl_text, cl_error = client.communicate(timeout=TIMEOUT)
- if client.returncode:
- raise Exception("Echo client failed size:%d, count:%d : %s %s" %
- (size, count, cl_text, cl_error))
+ cls.logger = Logger(title="TcpAdaptorOneRouterEcho-testClass", print_to_console=True)
+
+ def do_test_echo(self, test_name, logger, host, port, size, count):
+ # Run echo client. Return true if it works.
+ name = "%s_%s_%s_%s" % (test_name, port, size, count)
+ client_prefix = "ECHO_CLIENT %s" % name
+ client_logger = Logger(title=client_prefix, print_to_console=False, save_for_dump=True)
+ result = True # assume it works
+ try:
+ # start client
+ client = TcpEchoClient(prefix=client_prefix,
+ host=host,
+ port=port,
+ size=size,
+ count=count,
+ timeout=TIMEOUT,
+ logger=client_logger)
+ #assert client.is_running
+
+ # wait for client to finish
+ keep_waiting = True
+ while keep_waiting:
+ sleep(0.1)
+ if client.error is not None:
+ logger.log("%s Client stopped with error: %s" % (name, client.error))
+ keep_waiting = False
+ result = False
+ if client.exit_status is not None:
+ logger.log("%s Client stopped with status: %s" % (name, client.exit_status))
+ keep_waiting = False
+ result = False
+ if keep_waiting and not client.is_running:
+ logger.log("%s Client stopped with no error or status" % (name))
+ keep_waiting = False
+
+ except Exception as exc:
+ logger.log("EchoClient %s failed. Exception: %s" %
+ (name, traceback.format_exc()))
+ result = False
+
+ if not result:
+ # On failure, dump the client log through the test log. Compound logs here we go
+ for line in client_logger.logs:
+ logger.log("Failed client log: %s" % line)
+ return result
def test_01_tcp_echo_one_router(self):
+ """
+ Run one echo server.
+ Run many echo clients.
+ :return:
+ """
# start echo server
- #server = self.spawn_echo_server(self.tcp_server_listener_port)
-
- #for size in [1, 5, 10, 50, 100]:
- # for count in [1, 5, 10, 50, 100]:
- # self.logger.log("Starting echo client host:localhost, port:%d, size:%d, count:%d" %
- # (self.tcp_client_listener_port, size, count))
- # self.do_test_echo(self.logger, "localhost", self.tcp_client_listener_port, size, count)
- #server.join()
- pass
+ test_name = "test_01_tcp_echo_one_router"
+ server_prefix = "ECHO_SERVER %s" % test_name
+ server_logger = Logger(title=test_name, print_to_console=False, save_for_dump=True)
+ server = TcpEchoServer(prefix=server_prefix,
+ port=self.tcp_server_listener_port,
+ timeout=TIMEOUT,
+ logger=server_logger)
+ assert server.is_running
+
+ # run series of clients to test
+ result = True
+ for size in [1]:
+ for count in [1]:
+ test_info = "Starting echo client %s host:localhost, port:%d, size:%d, count:%d" % \
+ (test_name, self.tcp_client_listener_port, size, count)
+ self.logger.log(test_info)
+ result = self.do_test_echo(test_name, self.logger, "localhost",
+ self.tcp_client_listener_port, size, count)
+ if not result:
+ break
+ if not result:
+ break
+ # stop echo server
+ server.wait()
+ assert result, "Test case failed %s" % test_info
if __name__== '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org