You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2020/10/27 19:07:27 UTC

[qpid-dispatch] 02/02: DISPATCH-1807: Add TCP protocol adaptor tests

This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 2d074ac41800f18a9549f97b82e49911027607ec
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Tue Oct 27 15:04:06 2020 -0400

    DISPATCH-1807: Add TCP protocol adaptor tests
    
    Rewrite do-nothing tcp_adaptor test
    
        * Mistakenly committed test cmake including it before it's ready
        * It's still not ready but is finding some form
    
    Improve echo server
    
        * For messages larger than 100 bytes only print the first and
          last 50 bytes to logs.
    
    Improve echo client
    
        * Send unique data for each message
        * Improve logging
        * Add timeout
        * Don't use socket after closing it
---
 tests/TCP_echo_client.py          | 102 ++++++++++++++++++++++++---
 tests/TCP_echo_server.py          |  29 ++++++--
 tests/system_tests_tcp_adaptor.py | 145 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 262 insertions(+), 14 deletions(-)

diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py
index 83f1b44..5f2a687 100755
--- a/tests/TCP_echo_client.py
+++ b/tests/TCP_echo_client.py
@@ -24,12 +24,38 @@ import os
 import selectors
 import socket
 import sys
+import time
 import traceback
 import types
 
 from system_test import Logger
 
-def main_except(host, port, size, count, logger):
+
+class EchoLogger(Logger):
+    def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False):
+        self.prefix = prefix + ' ' if len(prefix) > 0 else ''
+        super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump)
+
+    def log(self, msg):
+        super(EchoLogger, self).log(self.prefix + msg)
+
+
+def split_chunk_for_display(raw_bytes):
+    """
+    Given some raw bytes, return a display string
+    Only show the beginning and end of largish (2xMAGIC_SIZE) arrays.
+    :param raw_bytes:
+    :return: display string
+    """
+    MAGIC_SIZE = 50  # Content repeats after chunks this big - used by echo client, too
+    if len(raw_bytes) > 2 * MAGIC_SIZE:
+        result = repr(raw_bytes[:MAGIC_SIZE]) + " ... " + repr(raw_bytes[-MAGIC_SIZE:])
+    else:
+        result = repr(raw_bytes)
+    return result
+
+
+def main_except(host, port, size, count, timeout, logger):
     '''
     :param host: connect to this host
     :param port: connect to this port
@@ -41,16 +67,36 @@ def main_except(host, port, size, count, logger):
     :return:
     '''
     # Start up
+    start_time = time.time()
     logger.log('Connecting to host:%s, port:%d, size:%d, count:%d' % (host, port, size, count))
     keep_going = True
+    total_sent = 0
+    total_rcvd = 0
 
     # outbound payload
     payload_out = []
     out_list_idx = 0  # current _out array being sent
     out_byte_idx = 0  # next-to-send in current array
     out_ready_to_send = True
-    for i in range(count):
-        payload_out.append(bytearray([i & 255] * size))
+    # Generate unique content for each message so you can tell where the message
+    # or fragment belongs in the whole stream. Chunks look like:
+    #    b'[localhost:33333:6:0]ggggggggggggggggggggggggggggg'
+    #    host: localhost
+    #    port: 33333
+    #    index: 6
+    #    offset into message: 0
+    MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo server, too
+    for idx in range(count):
+        body_msg = ""
+        padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30]
+        while len(body_msg) < size:
+            chunk = "[%s:%d:%d:%d]" % (host, port, idx, len(body_msg))
+            padlen = MAGIC_SIZE - len(chunk)
+            chunk += padchar * padlen
+            body_msg += chunk
+        if len(body_msg) > size:
+            body_msg = body_msg[:size]
+        payload_out.append(bytearray(body_msg.encode()))
     # incoming payloads
     payload_in  = []
     in_list_idx = 0 # current _in array being received
@@ -70,11 +116,17 @@ def main_except(host, port, size, count, logger):
 
     # event loop
     while keep_going:
-        for key, mask in sel.select(timeout=1):
+        if timeout > 0.0:
+            elapsed = time.time() - start_time
+            if elapsed > timeout:
+                logger.log("Exiting due to timeout. Total sent= %d, total rcvd= %d" % (total_sent, total_rcvd))
+                break
+        for key, mask in sel.select(timeout=0.1):
             sock = key.fileobj
             if mask & selectors.EVENT_READ:
                 recv_data = sock.recv(1024)
                 if recv_data:
+                    total_rcvd = len(recv_data)
                     payload_in[in_list_idx].extend(recv_data)
                     if len(payload_in[in_list_idx]) == size:
                         logger.log("Rcvd message %d" % in_list_idx)
@@ -82,6 +134,16 @@ def main_except(host, port, size, count, logger):
                         if in_list_idx == count:
                             # Received all bytes of all chunks - done.
                             keep_going = False
+                            # Verify the received data
+                            for idxc in range(count):
+                                for idxs in range(size):
+                                    ob = payload_out[idxc][idxs]
+                                    ib = payload_in [idxc][idxs]
+                                    if ob != ib:
+                                        error = "CRITICAL Rcvd message verify fail. row:%d, col:%d, expected:%s, actual:%s" \
+                                                % (idxc, idxs, repr(ob), repr(ib))
+                                        logger.log(error)
+                                        raise Exception(error)
                         else:
                             out_ready_to_send = True
                             sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE)
@@ -89,7 +151,7 @@ def main_except(host, port, size, count, logger):
                         error = "CRITICAL Rcvd message too big. Expected:%d, actual:%d" % \
                                 (size, len(payload_in[in_list_idx]))
                         logger.log(error)
-                        keep_going = False
+                        raise Exception(error)
                     else:
                         pass # still accumulating a message
                 else:
@@ -98,6 +160,7 @@ def main_except(host, port, size, count, logger):
             if mask & selectors.EVENT_WRITE:
                 if out_ready_to_send:
                     n_sent = sock.send( payload_out[out_list_idx][out_byte_idx:] )
+                    total_sent += n_sent
                     out_byte_idx += n_sent
                     if out_byte_idx == size:
                         logger.log("Sent message %d" % out_list_idx)
@@ -124,6 +187,10 @@ def main(argv):
                        help='Size of payload in bytes')
         p.add_argument('--count', '-c', type=int, default=1, const=1, nargs='?',
                        help='Number of payloads to process')
+        p.add_argument('--name',
+                       help='Optional logger prefix')
+        p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?",
+                       help='Timeout in seconds. Default value "0" disables timeouts')
         p.add_argument('--log', '-l',
                        action='store_true',
                        help='Write activity log to console')
@@ -141,18 +208,35 @@ def main(argv):
         port = args.port
 
         # size
+        if args.size <= 0:
+            raise Exception("Size must be greater than zero")
         size = args.size
 
         # count
+        if args.count <= 0:
+            raise Exception("Count must be greater than zero")
         count = args.count
 
+        # name / prefix
+        prefix = args.name if args.name is not None else "ECHO_CLIENT"
+
+        # timeout
+        if args.timeout < 0.0:
+            raise Exception("Timeout must be greater than or equal to zero")
+
         # logging
-        logger = Logger(title = "TCP_echo_client host:%s port %d size:%d count:%d" % (host, port, size, count),
-                        print_to_console = args.log,
-                        save_for_dump = False)
+        logger = EchoLogger(prefix=prefix,
+                            title = "%s host:%s port %d size:%d count:%d" % (prefix, host, port, size, count),
+                            print_to_console = args.log,
+                            save_for_dump = False)
 
-        main_except(host, port, size, count, logger)
+        main_except(host, port, size, count, args.timeout, logger)
         return 0
+
+    except KeyboardInterrupt:
+        logger.log("Exiting due to KeyboardInterrupt")
+        return 0
+
     except Exception as e:
         traceback.print_exc()
         return 1
diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py
index 44aefd7..28fc2ae 100755
--- a/tests/TCP_echo_server.py
+++ b/tests/TCP_echo_server.py
@@ -63,6 +63,21 @@ class EchoLogger(Logger):
         super(EchoLogger, self).log(self.prefix + msg)
 
 
+def split_chunk_for_display(raw_bytes):
+    """
+    Given some raw bytes, return a display string
+    Only show the beginning and end of largish (2xMAGIC_SIZE) arrays.
+    :param raw_bytes:
+    :return: display string
+    """
+    MAGIC_SIZE = 50  # Content repeats after chunks this big - used by echo client, too
+    if len(raw_bytes) > 2 * MAGIC_SIZE:
+        result = repr(raw_bytes[:MAGIC_SIZE]) + " ... " + repr(raw_bytes[-MAGIC_SIZE:])
+    else:
+        result = repr(raw_bytes)
+    return result
+
+
 def main_except(sock, port, echo_count, timeout, logger):
     '''
     :param lsock: socket to listen on
@@ -128,23 +143,26 @@ def do_service(key, mask, sel, logger):
         recv_data = sock.recv(1024)
         if recv_data:
             data.outb += recv_data
-            logger.log('read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data), repr(recv_data)))
+            logger.log('read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data),
+                                                        split_chunk_for_display(recv_data)))
             sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)
         else:
             logger.log('Closing connection to %s:%d' % (data.addr[0], data.addr[1]))
             sel.unregister(sock)
             sock.close()
+            return 0
     if mask & selectors.EVENT_WRITE:
         if data.outb:
             sent = sock.send(data.outb)
             retval += sent
             if sent > 0:
-                logger.log('write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent, repr(data.outb[:sent])))
+                logger.log('write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent,
+                                                            split_chunk_for_display(data.outb[:sent])))
             else:
                 logger.log('write to : %s:%d len:0' % (data.addr[0], data.addr[1]))
             data.outb = data.outb[sent:]
         else:
-            logger.log('write event with no data' + str(data))
+            #logger.log('write event with no data' + str(data))
             sel.modify(sock, selectors.EVENT_READ, data=data)
     return retval
 
@@ -183,7 +201,7 @@ def main(argv):
 
         # timeout
         if args.timeout < 0.0:
-            raise Exception("Timeout must be greater than zero")
+            raise Exception("Timeout must be greater than or equal to zero")
 
         # logging
         logger = EchoLogger(prefix = prefix,
@@ -197,9 +215,10 @@ def main(argv):
         main_except(lsock, port, args.echo, args.timeout, logger)
 
     except KeyboardInterrupt:
-        pass
+        logger.log("Exiting due to KeyboardInterrupt. Total echoed = %d" % total_echoed)
 
     except Exception as e:
+        logger.log("Exiting due to Exception. Total echoed = %d" % total_echoed)
         traceback.print_exc()
         retval = 1
 
diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py
new file mode 100644
index 0000000..8120937
--- /dev/null
+++ b/tests/system_tests_tcp_adaptor.py
@@ -0,0 +1,145 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+import os
+from time import sleep
+from threading import Event
+from threading import Timer
+
+from proton import Message, Timeout
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy, TestTimeout, PollTimeout
+from system_test import AsyncTestReceiver
+from system_test import AsyncTestSender
+from system_test import Logger
+from system_test import QdManager
+from system_test import unittest
+from system_test import Process
+from system_tests_link_routes import ConnLinkRouteService
+from test_broker import FakeBroker
+from test_broker import FakeService
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties
+from proton.utils import BlockingConnection
+from qpid_dispatch.management.client import Node
+from qpid_dispatch_internal.tools.command import version_supports_mutually_exclusive_arguments
+from subprocess import PIPE, STDOUT
+import re
+
+class AddrTimer(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.check_address()
+
+
+class TcpAdaptorOneRouterEcho(TestCase):
+    """
+    Run echo tests through a stand-alone router
+    """
+    amqp_listener_port       = None
+    tcp_client_listener_port = None
+    tcp_server_listener_port = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router and echo server"""
+        super(TcpAdaptorOneRouterEcho, cls).setUpClass()
+
+        def router(name, mode, l_amqp, l_tcp_client, l_tcp_server, addr, site, extra=None):
+            config = [
+                ('router', {'mode': mode, 'id': name}),
+                ('listener', {'port': l_amqp, 'stripAnnotations': 'no'}),
+                ('tcpConnector', {"host": "127.0.0.1",
+                                  "port": l_tcp_server,
+                                  "address": addr,
+                                  "siteId": site}),
+                ('tcpListener', {"host": "0.0.0.0",
+                                 "port": l_tcp_client,
+                                 "address": addr,
+                                 "siteId": site})
+            ]
+
+            if extra:
+                config.append(extra)
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+
+        cls.amqp_listener_port       = cls.tester.get_port()
+        cls.tcp_client_listener_port = cls.tester.get_port()
+        cls.tcp_server_listener_port = cls.tester.get_port()
+
+        router('INT.A', 'interior', cls.amqp_listener_port, cls.tcp_client_listener_port,
+               cls.tcp_server_listener_port, "some_address", "best_site")
+
+        cls.logger = Logger(title="TCP echo one router", print_to_console=True)
+
+    @classmethod
+    def tearDownClass(cls):
+        pass
+
+    def spawn_echo_server(self, port, expect=None):
+        cmd = ["TCP_echo_server.py",
+               "--port", str(port),
+               "--log"]
+        return self.popen(cmd, name='echo-server', stdout=PIPE, expect=expect,
+                          universal_newlines=True)
+
+    def spawn_echo_client(self, logger, host, port, size, count, expect=None):
+        if expect is None:
+            expect = Process.EXIT_OK
+        cmd = ["TCP_echo_client.py",
+               "--host", host,
+               "--port", str(port),
+               "--size", str(size),
+               "--count", str(count),
+               "--log"]
+        logger.log("Start client. cmd=%s" % str(cmd))
+        return self.popen(cmd, name='echo-clint', stdout=PIPE, expect=expect,
+                          universal_newlines=True)
+
+    def do_test_echo(self, logger, host, port, size, count):
+        # start echo client
+        client = self.spawn_echo_client(logger, host, port, size, count)
+        cl_text, cl_error = client.communicate(timeout=TIMEOUT)
+        if client.returncode:
+            raise Exception("Echo client failed size:%d, count:%d : %s %s" %
+                            (size, count, cl_text, cl_error))
+
+    def test_01_tcp_echo_one_router(self):
+        # start echo server
+        #server = self.spawn_echo_server(self.tcp_server_listener_port)
+
+        #for size in [1, 5, 10, 50, 100]:
+        #    for count in [1, 5, 10, 50, 100]:
+        #        self.logger.log("Starting echo client host:localhost, port:%d, size:%d, count:%d" %
+        #                   (self.tcp_client_listener_port, size, count))
+        #        self.do_test_echo(self.logger, "localhost", self.tcp_client_listener_port, size, count)
+        #server.join()
+        pass
+
+if __name__== '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org