You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2021/10/28 18:09:54 UTC
[qpid-dispatch] 03/03: DISPATCH-2261: close dispatcher out link on
connector delete
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch 1.17.x
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 4930a80f6381918b7e4e5b16206973e2e53146b6
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Tue Oct 26 09:05:12 2021 -0400
DISPATCH-2261: close dispatcher out link on connector delete
This closes #1398
---
src/adaptors/tcp_adaptor.c | 38 +++++---
tests/system_test.py | 2 +-
tests/system_tests_tcp_adaptor.py | 187 +++++++++++++++++---------------------
3 files changed, 111 insertions(+), 116 deletions(-)
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 3e71611..57ab8bc 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -140,6 +140,7 @@ static void handle_disconnected(qdr_tcp_connection_t* conn);
static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn);
static void free_bridge_config(qd_tcp_bridge_t *config);
static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc);
+static void detach_links(qdr_tcp_connection_t *tc);
// is the incoming byte window full
@@ -195,6 +196,7 @@ static void on_activate(void *context)
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] on_activate", conn->conn_id);
while (qdr_connection_process(conn->qdr_conn)) {}
if (conn->egress_dispatcher && conn->connector_closed) {
+ detach_links(conn);
qdr_connection_set_context(conn->qdr_conn, 0);
qdr_connection_closed(conn->qdr_conn);
conn->qdr_conn = 0;
@@ -473,25 +475,18 @@ static void handle_disconnected(qdr_tcp_connection_t* conn)
qd_message_set_receive_complete(qdr_delivery_message(conn->instream));
qdr_delivery_continue(tcp_adaptor->core, conn->instream, true);
qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected - instream");
+ conn->instream = 0;
}
if (conn->outstream) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close outstream",
conn->conn_id, conn->outgoing_id);
qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected - outstream");
+ conn->outstream = 0;
}
- if (conn->incoming) {
- qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
- "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach incoming",
- conn->conn_id, conn->incoming_id);
- qdr_link_detach(conn->incoming, QD_LOST, 0);
- }
- if (conn->outgoing) {
- qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
- "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach outgoing",
- conn->conn_id, conn->outgoing_id);
- qdr_link_detach(conn->outgoing, QD_LOST, 0);
- }
+
+ detach_links(conn);
+
if (conn->initial_delivery) {
qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->initial_delivery, PN_RELEASED, true, 0, false);
qdr_delivery_decref(tcp_adaptor->core, conn->initial_delivery, "tcp-adaptor.handle_disconnected - initial_delivery");
@@ -2060,3 +2055,22 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bo
free_qdr_tcp_connection(conn);
}
}
+
+
+static void detach_links(qdr_tcp_connection_t *conn)
+{
+ if (conn->incoming) {
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] detaching incoming link",
+ conn->conn_id, conn->incoming_id);
+ qdr_link_detach(conn->incoming, QD_LOST, 0);
+ conn->incoming = 0;
+ }
+ if (conn->outgoing) {
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] detaching outgoing link",
+ conn->conn_id, conn->outgoing_id);
+ qdr_link_detach(conn->outgoing, QD_LOST, 0);
+ conn->outgoing = 0;
+ }
+}
diff --git a/tests/system_test.py b/tests/system_test.py
index 08c96ae..0c45439 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -657,7 +657,7 @@ class Qdrouterd(Process):
def check():
addrs = self.management.query(a_type).get_dicts()
- rc = [a for a in addrs if address in a['name']]
+ rc = [a for a in addrs if a['name'].endswith(address)]
count = 0
for a in rc:
count += a['subscriberCount']
diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py
index fe26dd4..35cc1d7 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -20,6 +20,7 @@
import io
import json
import os
+import socket
import sys
import time
import traceback
@@ -996,39 +997,67 @@ class TcpAdaptorManagementTest(TestCase):
cls.tcp_listener_port = cls.tester.get_port()
cls.test_name = 'TCPMgmtTest'
- # Here we have a simple barebones standalone router config.
- config = [
- ('router', {'mode': 'standalone',
- 'id': cls.test_name}),
+ # create edge and interior routers. The listener/connector will be on
+ # the edge router. It is expected that the edge will create proxy
+ # links to the interior and remove them when the test is done.
+
+ cls.interior_edge_port = cls.tester.get_port()
+ cls.interior_mgmt_port = cls.tester.get_port()
+ cls.edge_mgmt_port = cls.tester.get_port()
+
+ cls.tcp_server_port = cls.tester.get_port()
+ cls.tcp_listener_port = cls.tester.get_port()
+
+ i_config = [
+ ('router', {'mode': 'interior',
+ 'id': 'TCPMgmtTestInterior'}),
('listener', {'role': 'normal',
- 'port': cls.tester.get_port()}),
+ 'port': cls.interior_mgmt_port}),
+ ('listener', {'role': 'edge', 'port': cls.interior_edge_port}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]
- config = Qdrouterd.Config(config)
- cls.router = cls.tester.qdrouterd(cls.test_name, config, wait=True)
+ config = Qdrouterd.Config(i_config)
+ cls.i_router = cls.tester.qdrouterd('TCPMgmtTestInterior', config, wait=False)
- # Start the echo server. This is the server that the tcpConnector
- # will be connecting to.
- server_prefix = "ECHO_SERVER ES_%s" % cls.test_name
- parent_path = os.path.dirname(os.getcwd())
- cls.logger = Logger(title="TcpAdaptor",
- print_to_console=True,
- save_for_dump=False,
- ofilename=os.path.join(parent_path, "setUpClass/TcpAdaptor_echo_server.log"))
- cls.echo_server = TcpEchoServer(prefix=server_prefix,
- port=cls.tcp_server_port,
- logger=cls.logger)
- # The router and the echo server are running at this point.
- assert cls.echo_server.is_running
+ e_config = [
+ ('router', {'mode': 'edge',
+ 'id': 'TCPMgmtTestEdge'}),
+ ('listener', {'role': 'normal',
+ 'port': cls.edge_mgmt_port}),
+ ('connector', {'name': 'edge', 'role': 'edge',
+ 'port': cls.interior_edge_port}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+ ]
+ config = Qdrouterd.Config(e_config)
+ cls.e_router = cls.tester.qdrouterd('TCPMgmtTestEdge', config,
+ wait=False)
+
+ cls.i_router.wait_ready()
+ cls.e_router.wait_ready()
+
+ def _query_links_by_addr(self, router_mgmt, owning_addr):
+ oid = 'org.apache.qpid.dispatch.router.link'
+ attrs = ['owningAddr', 'linkDir']
+
+ links = []
+ rc = router_mgmt.query(type=oid, attribute_names=attrs).results
+ for link in rc:
+ if link[0] is not None and link[0].endswith(owning_addr):
+ links.append(link)
+ return links
@unittest.skipIf(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON)
def test_01_mgmt(self):
"""
- Create and delete TCP connectors and listeners
+ Create and delete TCP connectors and listeners. Ensure that the service
+ address is properly removed on the interior router.
"""
LISTENER_TYPE = 'org.apache.qpid.dispatch.tcpListener'
CONNECTOR_TYPE = 'org.apache.qpid.dispatch.tcpConnector'
- mgmt = self.router.management
+ mgmt = self.e_router.management
# When starting out, there should be no tcpListeners or tcpConnectors.
self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results))
@@ -1052,93 +1081,45 @@ class TcpAdaptorManagementTest(TestCase):
self.assertEqual(1, len(mgmt.query(type=LISTENER_TYPE).results))
self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results))
- # Give a second for the tcpListener to start listening.
- time.sleep(1)
- # Start the echo client runner
- client_runner_timeout = 3
- runner = EchoClientRunner(self.test_name, 1, self.logger,
- None, None, 100, 1,
- timeout=client_runner_timeout,
- port_override=self.tcp_listener_port)
- result = None
+ # now verify that the interior router sees the service address
+ # and two proxy links are created
+ self.i_router.wait_address(self.test_name, subscribers=1)
+ while True:
+ links = self._query_links_by_addr(self.i_router.management,
+ self.test_name)
+ if links:
+ # expect a single consumer link that represents
+ # the connector
+ self.assertEqual(1, len(links))
+ self.assertEqual("out", links[0][1])
+ break
+ time.sleep(0.25)
- # Give some time for the client runner to finish up.
- time.sleep(client_runner_timeout + 1)
-
- # Make sure servers are still up
- if self.echo_server.error:
- self.logger.log(
- "TCP_TEST %s Server %s stopped with error: %s" %
- (self.test_name, self.echo_server.prefix,
- self.echo_server.error))
- result = self.echo_server.error
-
- if self.echo_server.exit_status:
- self.logger.log(
- "TCP_TEST %s Server %s stopped with status: %s" %
- (self.test_name, self.echo_server.prefix, self.echo_server.exit_status))
- result = self.echo_server.exit_status
-
- self.assertIsNone(result)
-
- error = runner.client_error()
- if error is not None:
- self.logger.log("TCP_TEST %s Client %s stopped with error: %s" %
- (self.test_name, runner.name, error))
-
- self.assertIsNone(error)
- status = runner.client_exit_status()
- if status is not None:
- self.logger.log("TCP_TEST %s Client %s stopped with status: %s" %
- (self.test_name, runner.name, status))
- self.assertIsNone(status)
- self.assertFalse(runner.client_running())
-
- # Delete the connector and make sure the echo client fails.
+ # Delete the connector and listener
out = mgmt.delete(type=CONNECTOR_TYPE, name=connector_name)
self.assertIsNone(out)
-
- # Give some time for the connector to be deleted by the router.
- # Deleting a connector also involves deleting existing connections
- # that were made using the details from the connector.
- # In this case, the router would have to drop the connection it
- # already made to the echo server, so let's give it some time to
- # do that.
- time.sleep(2)
-
- client_runner_timeout = 2
- # Start the echo client runner
- runner = EchoClientRunner(self.test_name, 1, self.logger,
- None, None, 100, 1,
- # Try for 2 seconds before timing out
- timeout=client_runner_timeout,
- port_override=self.tcp_listener_port)
- time.sleep(client_runner_timeout + 1)
- exit_status = runner.client_exit_status()
-
- if exit_status is not None:
- # The test is a success, the echo client sender timed out
- # because it did not receive anything back from the
- # echo server because the connector to the echo server
- # got deleted
- self.logger.log("TCP_TEST %s Client %s timedout with error: %s" %
- (self.test_name, runner.name, exit_status))
- else:
- self.logger.log("ERROR: Connector not deleted")
- self.assertIsNotNone(exit_status)
-
- # Now delete the tcpListener
+ self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results))
out = mgmt.delete(type=LISTENER_TYPE, name=listener_name)
self.assertIsNone(out)
+ self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results))
- runner = EchoClientRunner(self.test_name, 1, self.logger,
- None, None, 100, 1,
- # Try for 2 seconds before timing out
- timeout=client_runner_timeout,
- port_override=self.tcp_listener_port)
- time.sleep(client_runner_timeout + 1)
- error = runner.client_error()
- self.assertIn("ConnectionRefusedError", error)
+ # verify the service address and proxy links are no longer active on
+ # the interior router
+ self.i_router.wait_address_unsubscribed(self.test_name)
+ while True:
+ links = self._query_links_by_addr(self.i_router.management,
+ self.test_name)
+ if len(links) == 0:
+ break
+ time.sleep(0.25)
+
+ # verify that clients can no longer connect to the listener
+ client_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ client_conn.setblocking(True)
+ client_conn.settimeout(5)
+ with self.assertRaises(ConnectionRefusedError):
+ client_conn.connect(('127.0.0.1', self.tcp_listener_port))
+ client_conn.close()
if __name__ == '__main__':
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org