You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2020/10/27 19:07:25 UTC

[qpid-dispatch] branch dev-protocol-adaptors-2 updated (d338f85 -> 2d074ac)

This is an automated email from the ASF dual-hosted git repository.

chug pushed a change to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git.


    from d338f85  NO-JIRA: add dummy system_tests_tcp_adaptor.py for temporary CI fix
     new 76c00c9  Revert "NO-JIRA: add dummy system_tests_tcp_adaptor.py for temporary CI fix"
     new 2d074ac  DISPATCH-1807: Add TCP protocol adaptor tests

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 tests/TCP_echo_client.py          | 102 ++++++++++++++++++++++++---
 tests/TCP_echo_server.py          |  29 ++++++--
 tests/system_tests_tcp_adaptor.py | 145 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 262 insertions(+), 14 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 01/02: Revert "NO-JIRA: add dummy system_tests_tcp_adaptor.py for temporary CI fix"

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 76c00c978c10687300fbefaff29fa70fca4d84b3
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Tue Oct 27 15:03:33 2020 -0400

    Revert "NO-JIRA: add dummy system_tests_tcp_adaptor.py for temporary CI fix"
    
    This reverts commit d338f851bcaa9259606ff067a659f80b884a77b4.
---
 tests/system_tests_tcp_adaptor.py | 0
 1 file changed, 0 insertions(+), 0 deletions(-)

diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py
deleted file mode 100644
index e69de29..0000000


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 02/02: DISPATCH-1807: Add TCP protocol adaptor tests

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 2d074ac41800f18a9549f97b82e49911027607ec
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Tue Oct 27 15:04:06 2020 -0400

    DISPATCH-1807: Add TCP protocol adaptor tests
    
    Rewrite do-nothing tcp_adaptor test
    
        * Mistakenly committed test cmake including it before it's ready
        * It's still not ready but is finding some form
    
    Improve echo server
    
        * For messages larger than 100 bytes only print the first and
          last 50 bytes to logs.
    
    Improve echo client
    
        * Send unique data for each message
        * Improve logging
        * Add timeout
        * Don't use socket after closing it
---
 tests/TCP_echo_client.py          | 102 ++++++++++++++++++++++++---
 tests/TCP_echo_server.py          |  29 ++++++--
 tests/system_tests_tcp_adaptor.py | 145 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 262 insertions(+), 14 deletions(-)

diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py
index 83f1b44..5f2a687 100755
--- a/tests/TCP_echo_client.py
+++ b/tests/TCP_echo_client.py
@@ -24,12 +24,38 @@ import os
 import selectors
 import socket
 import sys
+import time
 import traceback
 import types
 
 from system_test import Logger
 
-def main_except(host, port, size, count, logger):
+
+class EchoLogger(Logger):
+    def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False):
+        self.prefix = prefix + ' ' if len(prefix) > 0 else ''
+        super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump)
+
+    def log(self, msg):
+        super(EchoLogger, self).log(self.prefix + msg)
+
+
+def split_chunk_for_display(raw_bytes):
+    """
+    Given some raw bytes, return a display string
+    Only show the beginning and end of largish (2xMAGIC_SIZE) arrays.
+    :param raw_bytes:
+    :return: display string
+    """
+    MAGIC_SIZE = 50  # Content repeats after chunks this big - used by echo client, too
+    if len(raw_bytes) > 2 * MAGIC_SIZE:
+        result = repr(raw_bytes[:MAGIC_SIZE]) + " ... " + repr(raw_bytes[-MAGIC_SIZE:])
+    else:
+        result = repr(raw_bytes)
+    return result
+
+
+def main_except(host, port, size, count, timeout, logger):
     '''
     :param host: connect to this host
     :param port: connect to this port
@@ -41,16 +67,36 @@ def main_except(host, port, size, count, logger):
     :return:
     '''
     # Start up
+    start_time = time.time()
     logger.log('Connecting to host:%s, port:%d, size:%d, count:%d' % (host, port, size, count))
     keep_going = True
+    total_sent = 0
+    total_rcvd = 0
 
     # outbound payload
     payload_out = []
     out_list_idx = 0  # current _out array being sent
     out_byte_idx = 0  # next-to-send in current array
     out_ready_to_send = True
-    for i in range(count):
-        payload_out.append(bytearray([i & 255] * size))
+    # Generate unique content for each message so you can tell where the message
+    # or fragment belongs in the whole stream. Chunks look like:
+    #    b'[localhost:33333:6:0]ggggggggggggggggggggggggggggg'
+    #    host: localhost
+    #    port: 33333
+    #    index: 6
+    #    offset into message: 0
+    MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo server, too
+    for idx in range(count):
+        body_msg = ""
+        padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30]
+        while len(body_msg) < size:
+            chunk = "[%s:%d:%d:%d]" % (host, port, idx, len(body_msg))
+            padlen = MAGIC_SIZE - len(chunk)
+            chunk += padchar * padlen
+            body_msg += chunk
+        if len(body_msg) > size:
+            body_msg = body_msg[:size]
+        payload_out.append(bytearray(body_msg.encode()))
     # incoming payloads
     payload_in  = []
     in_list_idx = 0 # current _in array being received
@@ -70,11 +116,17 @@ def main_except(host, port, size, count, logger):
 
     # event loop
     while keep_going:
-        for key, mask in sel.select(timeout=1):
+        if timeout > 0.0:
+            elapsed = time.time() - start_time
+            if elapsed > timeout:
+                logger.log("Exiting due to timeout. Total sent= %d, total rcvd= %d" % (total_sent, total_rcvd))
+                break
+        for key, mask in sel.select(timeout=0.1):
             sock = key.fileobj
             if mask & selectors.EVENT_READ:
                 recv_data = sock.recv(1024)
                 if recv_data:
+                    total_rcvd = len(recv_data)
                     payload_in[in_list_idx].extend(recv_data)
                     if len(payload_in[in_list_idx]) == size:
                         logger.log("Rcvd message %d" % in_list_idx)
@@ -82,6 +134,16 @@ def main_except(host, port, size, count, logger):
                         if in_list_idx == count:
                             # Received all bytes of all chunks - done.
                             keep_going = False
+                            # Verify the received data
+                            for idxc in range(count):
+                                for idxs in range(size):
+                                    ob = payload_out[idxc][idxs]
+                                    ib = payload_in [idxc][idxs]
+                                    if ob != ib:
+                                        error = "CRITICAL Rcvd message verify fail. row:%d, col:%d, expected:%s, actual:%s" \
+                                                % (idxc, idxs, repr(ob), repr(ib))
+                                        logger.log(error)
+                                        raise Exception(error)
                         else:
                             out_ready_to_send = True
                             sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE)
@@ -89,7 +151,7 @@ def main_except(host, port, size, count, logger):
                         error = "CRITICAL Rcvd message too big. Expected:%d, actual:%d" % \
                                 (size, len(payload_in[in_list_idx]))
                         logger.log(error)
-                        keep_going = False
+                        raise Exception(error)
                     else:
                         pass # still accumulating a message
                 else:
@@ -98,6 +160,7 @@ def main_except(host, port, size, count, logger):
             if mask & selectors.EVENT_WRITE:
                 if out_ready_to_send:
                     n_sent = sock.send( payload_out[out_list_idx][out_byte_idx:] )
+                    total_sent += n_sent
                     out_byte_idx += n_sent
                     if out_byte_idx == size:
                         logger.log("Sent message %d" % out_list_idx)
@@ -124,6 +187,10 @@ def main(argv):
                        help='Size of payload in bytes')
         p.add_argument('--count', '-c', type=int, default=1, const=1, nargs='?',
                        help='Number of payloads to process')
+        p.add_argument('--name',
+                       help='Optional logger prefix')
+        p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?",
+                       help='Timeout in seconds. Default value "0" disables timeouts')
         p.add_argument('--log', '-l',
                        action='store_true',
                        help='Write activity log to console')
@@ -141,18 +208,35 @@ def main(argv):
         port = args.port
 
         # size
+        if args.size <= 0:
+            raise Exception("Size must be greater than zero")
         size = args.size
 
         # count
+        if args.count <= 0:
+            raise Exception("Count must be greater than zero")
         count = args.count
 
+        # name / prefix
+        prefix = args.name if args.name is not None else "ECHO_CLIENT"
+
+        # timeout
+        if args.timeout < 0.0:
+            raise Exception("Timeout must be greater than or equal to zero")
+
         # logging
-        logger = Logger(title = "TCP_echo_client host:%s port %d size:%d count:%d" % (host, port, size, count),
-                        print_to_console = args.log,
-                        save_for_dump = False)
+        logger = EchoLogger(prefix=prefix,
+                            title = "%s host:%s port %d size:%d count:%d" % (prefix, host, port, size, count),
+                            print_to_console = args.log,
+                            save_for_dump = False)
 
-        main_except(host, port, size, count, logger)
+        main_except(host, port, size, count, args.timeout, logger)
         return 0
+
+    except KeyboardInterrupt:
+        logger.log("Exiting due to KeyboardInterrupt")
+        return 0
+
     except Exception as e:
         traceback.print_exc()
         return 1
diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py
index 44aefd7..28fc2ae 100755
--- a/tests/TCP_echo_server.py
+++ b/tests/TCP_echo_server.py
@@ -63,6 +63,21 @@ class EchoLogger(Logger):
         super(EchoLogger, self).log(self.prefix + msg)
 
 
+def split_chunk_for_display(raw_bytes):
+    """
+    Given some raw bytes, return a display string
+    Only show the beginning and end of largish (2xMAGIC_SIZE) arrays.
+    :param raw_bytes:
+    :return: display string
+    """
+    MAGIC_SIZE = 50  # Content repeats after chunks this big - used by echo client, too
+    if len(raw_bytes) > 2 * MAGIC_SIZE:
+        result = repr(raw_bytes[:MAGIC_SIZE]) + " ... " + repr(raw_bytes[-MAGIC_SIZE:])
+    else:
+        result = repr(raw_bytes)
+    return result
+
+
 def main_except(sock, port, echo_count, timeout, logger):
     '''
     :param lsock: socket to listen on
@@ -128,23 +143,26 @@ def do_service(key, mask, sel, logger):
         recv_data = sock.recv(1024)
         if recv_data:
             data.outb += recv_data
-            logger.log('read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data), repr(recv_data)))
+            logger.log('read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data),
+                                                        split_chunk_for_display(recv_data)))
             sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)
         else:
             logger.log('Closing connection to %s:%d' % (data.addr[0], data.addr[1]))
             sel.unregister(sock)
             sock.close()
+            return 0
     if mask & selectors.EVENT_WRITE:
         if data.outb:
             sent = sock.send(data.outb)
             retval += sent
             if sent > 0:
-                logger.log('write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent, repr(data.outb[:sent])))
+                logger.log('write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent,
+                                                            split_chunk_for_display(data.outb[:sent])))
             else:
                 logger.log('write to : %s:%d len:0' % (data.addr[0], data.addr[1]))
             data.outb = data.outb[sent:]
         else:
-            logger.log('write event with no data' + str(data))
+            #logger.log('write event with no data' + str(data))
             sel.modify(sock, selectors.EVENT_READ, data=data)
     return retval
 
@@ -183,7 +201,7 @@ def main(argv):
 
         # timeout
         if args.timeout < 0.0:
-            raise Exception("Timeout must be greater than zero")
+            raise Exception("Timeout must be greater than or equal to zero")
 
         # logging
         logger = EchoLogger(prefix = prefix,
@@ -197,9 +215,10 @@ def main(argv):
         main_except(lsock, port, args.echo, args.timeout, logger)
 
     except KeyboardInterrupt:
-        pass
+        logger.log("Exiting due to KeyboardInterrupt. Total echoed = %d" % total_echoed)
 
     except Exception as e:
+        logger.log("Exiting due to Exception. Total echoed = %d" % total_echoed)
         traceback.print_exc()
         retval = 1
 
diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py
new file mode 100644
index 0000000..8120937
--- /dev/null
+++ b/tests/system_tests_tcp_adaptor.py
@@ -0,0 +1,145 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+import os
+from time import sleep
+from threading import Event
+from threading import Timer
+
+from proton import Message, Timeout
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy, TestTimeout, PollTimeout
+from system_test import AsyncTestReceiver
+from system_test import AsyncTestSender
+from system_test import Logger
+from system_test import QdManager
+from system_test import unittest
+from system_test import Process
+from system_tests_link_routes import ConnLinkRouteService
+from test_broker import FakeBroker
+from test_broker import FakeService
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties
+from proton.utils import BlockingConnection
+from qpid_dispatch.management.client import Node
+from qpid_dispatch_internal.tools.command import version_supports_mutually_exclusive_arguments
+from subprocess import PIPE, STDOUT
+import re
+
+class AddrTimer(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.check_address()
+
+
+class TcpAdaptorOneRouterEcho(TestCase):
+    """
+    Run echo tests through a stand-alone router
+    """
+    amqp_listener_port       = None
+    tcp_client_listener_port = None
+    tcp_server_listener_port = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router and echo server"""
+        super(TcpAdaptorOneRouterEcho, cls).setUpClass()
+
+        def router(name, mode, l_amqp, l_tcp_client, l_tcp_server, addr, site, extra=None):
+            config = [
+                ('router', {'mode': mode, 'id': name}),
+                ('listener', {'port': l_amqp, 'stripAnnotations': 'no'}),
+                ('tcpConnector', {"host": "127.0.0.1",
+                                  "port": l_tcp_server,
+                                  "address": addr,
+                                  "siteId": site}),
+                ('tcpListener', {"host": "0.0.0.0",
+                                 "port": l_tcp_client,
+                                 "address": addr,
+                                 "siteId": site})
+            ]
+
+            if extra:
+                config.append(extra)
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+
+        cls.amqp_listener_port       = cls.tester.get_port()
+        cls.tcp_client_listener_port = cls.tester.get_port()
+        cls.tcp_server_listener_port = cls.tester.get_port()
+
+        router('INT.A', 'interior', cls.amqp_listener_port, cls.tcp_client_listener_port,
+               cls.tcp_server_listener_port, "some_address", "best_site")
+
+        cls.logger = Logger(title="TCP echo one router", print_to_console=True)
+
+    @classmethod
+    def tearDownClass(cls):
+        pass
+
+    def spawn_echo_server(self, port, expect=None):
+        cmd = ["TCP_echo_server.py",
+               "--port", str(port),
+               "--log"]
+        return self.popen(cmd, name='echo-server', stdout=PIPE, expect=expect,
+                          universal_newlines=True)
+
+    def spawn_echo_client(self, logger, host, port, size, count, expect=None):
+        if expect is None:
+            expect = Process.EXIT_OK
+        cmd = ["TCP_echo_client.py",
+               "--host", host,
+               "--port", str(port),
+               "--size", str(size),
+               "--count", str(count),
+               "--log"]
+        logger.log("Start client. cmd=%s" % str(cmd))
+        return self.popen(cmd, name='echo-clint', stdout=PIPE, expect=expect,
+                          universal_newlines=True)
+
+    def do_test_echo(self, logger, host, port, size, count):
+        # start echo client
+        client = self.spawn_echo_client(logger, host, port, size, count)
+        cl_text, cl_error = client.communicate(timeout=TIMEOUT)
+        if client.returncode:
+            raise Exception("Echo client failed size:%d, count:%d : %s %s" %
+                            (size, count, cl_text, cl_error))
+
+    def test_01_tcp_echo_one_router(self):
+        # start echo server
+        #server = self.spawn_echo_server(self.tcp_server_listener_port)
+
+        #for size in [1, 5, 10, 50, 100]:
+        #    for count in [1, 5, 10, 50, 100]:
+        #        self.logger.log("Starting echo client host:localhost, port:%d, size:%d, count:%d" %
+        #                   (self.tcp_client_listener_port, size, count))
+        #        self.do_test_echo(self.logger, "localhost", self.tcp_client_listener_port, size, count)
+        #server.join()
+        pass
+
+if __name__== '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org