You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2021/06/14 20:41:45 UTC

[qpid-dispatch] branch main updated: DISPATCH-2161, DISPATCH-2167: handle various client close errors

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 4770312  DISPATCH-2161, DISPATCH-2167: handle various client close errors
4770312 is described below

commit 4770312a4bbb61d544ed7562c15e6810067293dc
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Fri Jun 4 15:32:23 2021 -0400

    DISPATCH-2161, DISPATCH-2167: handle various client close errors
    
    This closes #1251
---
 src/adaptors/http1/http1_adaptor.c  |  12 +--
 src/adaptors/http1/http1_client.c   |  44 ++++++++--
 src/adaptors/http1/http1_server.c   |  80 ++++++++++++++----
 tests/http1_tests.py                | 156 ++++++++++++++++++++++++++++++++++++
 tests/system_tests_http1_adaptor.py |  41 +++++++++-
 5 files changed, 300 insertions(+), 33 deletions(-)

diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index 0903529..5874ec9 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -143,7 +143,8 @@ void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data)
 {
     if (out_data) {
         // expect: all buffers returned from proton!
-        assert(qdr_http1_out_data_buffers_outstanding(out_data) == 0);
+        // FIXME: not during router shutdown!
+        // assert(qdr_http1_out_data_buffers_outstanding(out_data) == 0);
         qdr_http1_out_data_t *od = DEQ_HEAD(out_data->fifo);
         while (od) {
             DEQ_REMOVE_HEAD(out_data->fifo);
@@ -586,8 +587,8 @@ static uint64_t _core_link_deliver(void *context, qdr_link_t *link, qdr_delivery
 
     if (hconn) {
         qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
-               "[C%"PRIu64"][L%"PRIu64"] Core link deliver %p %s", hconn->conn_id, link->identity,
-               (void*)delivery, settled ? "settled" : "unsettled");
+               DLV_FMT" Core link deliver (%s)", DLV_ARGS(delivery),
+               settled ? "settled" : "unsettled");
 
         if (hconn->type == HTTP1_CONN_SERVER)
             outcome = qdr_http1_server_core_link_deliver(qdr_http1_adaptor, hconn, link, delivery, settled);
@@ -619,10 +620,9 @@ static void _core_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t d
     qdr_http1_request_base_t *hreq = (qdr_http1_request_base_t*) qdr_delivery_get_context(dlv);
     if (hreq) {
         qdr_http1_connection_t *hconn = hreq->hconn;
-        qdr_link_t *link = qdr_delivery_link(dlv);
         qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
-               "[C%"PRIu64"][L%"PRIu64"] Core Delivery update disp=0x%"PRIx64" %s",
-               hconn->conn_id, link->identity, disp,
+               DLV_FMT" core delivery update disp=0x%"PRIx64" %s",
+               DLV_ARGS(dlv), disp,
                settled ? "settled" : "unsettled");
 
         if (hconn->type == HTTP1_CONN_SERVER)
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 565da12..204f7a7 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -366,6 +366,18 @@ static void _setup_client_connection(qdr_http1_connection_t *hconn)
 }
 
 
+static void _handle_conn_read_close(qdr_http1_connection_t *hconn)
+{
+    qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] Closed for reading", hconn->conn_id);
+}
+
+
+static void _handle_conn_write_close(qdr_http1_connection_t *hconn)
+{
+    qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] Closed for writing", hconn->conn_id);
+}
+
+
 // handle PN_RAW_CONNECTION_READ
 static int _handle_conn_read_event(qdr_http1_connection_t *hconn)
 {
@@ -378,6 +390,17 @@ static int _handle_conn_read_event(qdr_http1_connection_t *hconn)
                "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from client (%zu buffers)",
                hconn->conn_id, hconn->in_link_id, length, DEQ_SIZE(blist));
         hconn->in_http1_octets += length;
+
+        if (HTTP1_DUMP_BUFFERS) {
+            fprintf(stdout, "\nClient raw buffer READ %"PRIuMAX" total octets\n", length);
+            qd_buffer_t *bb = DEQ_HEAD(blist);
+            while (bb) {
+                fprintf(stdout, "  buffer='%.*s'\n", (int)qd_buffer_size(bb), (char*)&bb[1]);
+                bb = DEQ_NEXT(bb);
+            }
+            fflush(stdout);
+        }
+
         error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
     }
     return error;
@@ -413,11 +436,13 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
         _setup_client_connection(hconn);
         break;
     }
-    case PN_RAW_CONNECTION_CLOSED_READ:
+    case PN_RAW_CONNECTION_CLOSED_READ: {
+        _handle_conn_read_close(hconn);
+        pn_raw_connection_close(hconn->raw_conn);
+        break;
+    }
     case PN_RAW_CONNECTION_CLOSED_WRITE: {
-        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closed for %s", hconn->conn_id,
-               pn_event_type(e) == PN_RAW_CONNECTION_CLOSED_READ
-               ? "reading" : "writing");
+        _handle_conn_write_close(hconn);
         pn_raw_connection_close(hconn->raw_conn);
         break;
     }
@@ -434,10 +459,12 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
 
         if (hconn->out_link) {
             qdr_link_set_context(hconn->out_link, 0);
+            qdr_link_detach(hconn->out_link, QD_LOST, 0);
             hconn->out_link = 0;
         }
         if (hconn->in_link) {
             qdr_link_set_context(hconn->in_link, 0);
+            qdr_link_detach(hconn->in_link, QD_LOST, 0);
             hconn->in_link = 0;
         }
         if (hconn->qdr_conn) {
@@ -944,7 +971,7 @@ static void _client_rx_done_cb(h1_codec_request_state_t *hrs)
 }
 
 
-// The coded has completed processing the request and response messages.
+// The codec has completed processing the request and response messages.
 //
 static void _client_request_complete_cb(h1_codec_request_state_t *lib_rs, bool cancelled)
 {
@@ -962,7 +989,7 @@ static void _client_request_complete_cb(h1_codec_request_state_t *lib_rs, bool c
                "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" %s. Octets read: %"PRIu64" written: %"PRIu64,
                hreq->base.hconn->conn_id,
                hreq->base.msg_id,
-               cancelled ? "cancelled!" : "codec done",
+               cancelled ? "cancelled" : "codec done",
                in_octets, out_octets);
     }
 }
@@ -1651,13 +1678,14 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
 static void _client_response_msg_free(_client_request_t *req, _client_response_msg_t *rmsg)
 {
     DEQ_REMOVE(req->responses, rmsg);
+    qdr_http1_out_data_fifo_cleanup(&rmsg->out_data);
+
     if (rmsg->dlv) {
         qdr_delivery_set_context(rmsg->dlv, 0);
         qdr_delivery_decref(qdr_http1_adaptor->core, rmsg->dlv, "HTTP1 client response delivery settled");
+        rmsg->dlv = 0;
     }
 
-    qdr_http1_out_data_fifo_cleanup(&rmsg->out_data);
-
     free__client_response_msg_t(rmsg);
 }
 
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 4b05d47..bcdb3e5 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -49,6 +49,7 @@ typedef struct _server_response_msg_t {
     qd_composed_field_t *msg_props; // hold incoming headers
     qdr_delivery_t      *dlv;       // inbound to router (qdr_link_deliver)
     bool                 rx_complete; // response rx complete
+    bool                 discard;     // client no longer present
 } _server_response_msg_t;
 ALLOC_DECLARE(_server_response_msg_t);
 ALLOC_DEFINE(_server_response_msg_t);
@@ -715,6 +716,7 @@ static bool _process_request(_server_request_t *hreq)
             if (rmsg->dlv) {
                 qd_message_set_receive_complete(qdr_delivery_message(rmsg->dlv));
                 qdr_delivery_set_aborted(rmsg->dlv);
+                qdr_delivery_continue(qdr_http1_adaptor->core, rmsg->dlv, true);
             }
             _server_response_msg_free(hreq, rmsg);
             rmsg = DEQ_HEAD(hreq->responses);
@@ -1025,15 +1027,20 @@ static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b
     }
 
     _server_response_msg_t *rmsg  = DEQ_TAIL(hreq->responses);
+    if (rmsg->discard) {
+        qd_buffer_list_free_buffers(body);
+        return 0;
+    }
 
     qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv);
-
-
     qd_message_stream_data_append(msg, body, &q2_blocked);
     hconn->q2_blocked = hconn->q2_blocked || q2_blocked;
     if (q2_blocked) {
         // note: unit tests grep for this log!
-        qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] server link blocked on Q2 limit", hconn->conn_id);
+        if (rmsg->dlv)
+            qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, DLV_FMT" server link blocked on Q2 limit", DLV_ARGS(rmsg->dlv));
+        else
+            qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] server link blocked on Q2 limit", hconn->conn_id);
     }
 
     //
@@ -1191,14 +1198,32 @@ void qdr_http1_server_core_delivery_update(qdr_http1_adaptor_t      *adaptor,
                                            bool                      settled)
 {
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
-           "[C%"PRIu64"][L%"PRIu64"] HTTP response delivery update, outcome=0x%"PRIx64"%s",
-           hconn->conn_id, hconn->in_link_id, disp, settled ? " settled": "");
-
-    // Not much can be done with error dispositions (I think)
-    if (disp != PN_ACCEPTED) {
-        qd_log(adaptor->log, QD_LOG_WARNING,
-               "[C%"PRIu64"][L%"PRIu64"] response message was not accepted, outcome=0x%"PRIx64,
-               hconn->conn_id, hconn->in_link_id, disp);
+           DLV_FMT" HTTP response delivery update, outcome=0x%"PRIx64"%s",
+           DLV_ARGS(dlv), disp, settled ? " settled": "");
+
+    if (settled) {
+        if (disp && disp != PN_ACCEPTED) {
+            qd_log(adaptor->log, QD_LOG_WARNING,
+                   "[C%"PRIu64"][L%"PRIu64"] response message was not accepted, outcome=0x%"PRIx64,
+                   hconn->conn_id, hconn->in_link_id, disp);
+
+            // This indicates the client that originated the request is
+            // unreachable. Rather than drop the connection allow the server to
+            // finish the response. We'll simply discard the response data as
+            // it is read from the raw connection
+            _server_request_t *hreq = (_server_request_t*)hbase;
+            _server_response_msg_t *rmsg  = DEQ_HEAD(hreq->responses);
+            while (rmsg) {
+                if (rmsg->dlv == dlv) {
+                    rmsg->discard = true;
+                    qd_message_t *msg = qdr_delivery_message(dlv);
+                    qd_message_set_discard(msg, true);
+                    qd_message_Q2_holdoff_disable(msg);
+                    break;
+                }
+                rmsg = DEQ_NEXT(rmsg);
+            }
+        }
     }
     if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) {
         _server_request_t *hreq = (_server_request_t*)hbase;
@@ -1529,6 +1554,16 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
 
     _server_request_t *hreq = (_server_request_t*) qdr_delivery_get_context(delivery);
     if (!hreq) {
+
+        if (qd_message_aborted(msg)) {
+            // can safely discard since it was yet to be processed
+            qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                   DLV_FMT" Discarding aborted request", DLV_ARGS(delivery));
+            qd_message_set_send_complete(msg);
+            qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
+            return PN_REJECTED;
+        }
+
         // new delivery - create new request:
         switch (qd_message_check_depth(msg, QD_DEPTH_PROPERTIES)) {
         case QD_MESSAGE_DEPTH_INCOMPLETE:
@@ -1536,8 +1571,7 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
 
         case QD_MESSAGE_DEPTH_INVALID:
             qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
-                   "[C%"PRIu64"][L%"PRIu64"] Malformed HTTP/1.x message",
-                   hconn->conn_id, link->identity);
+                   DLV_FMT" Malformed HTTP/1.x message", DLV_ARGS(delivery));
             qd_message_set_send_complete(msg);
             qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
             return PN_REJECTED;
@@ -1546,7 +1580,7 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
             hreq = _create_request_context(hconn, msg);
             if (!hreq) {
                 qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
-                       "[C%"PRIu64"][L%"PRIu64"] Discarding malformed message.", hconn->conn_id, link->identity);
+                       DLV_FMT" Discarding malformed message.", DLV_ARGS(delivery));
                 qd_message_set_send_complete(msg);
                 qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
                 return PN_REJECTED;
@@ -1557,8 +1591,22 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
             qdr_delivery_incref(delivery, "HTTP1 server referencing request delivery");
             break;
         }
+
+    } else if (qd_message_aborted(msg)) {
+        //
+        // The client has aborted the request message.  This can happen when
+        // the HTTP client has dropped its connection mid-request message. If
+        // this request has not yet been written to the server it can be safely
+        // discard.  However, if some of the request has been sent then the
+        // connection to the server must be bounced in order to recover.
+        //
+        qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+               DLV_FMT" Client has aborted the request", DLV_ARGS(delivery));
+        _cancel_request(hreq);
+        return 0;
     }
 
+    // send request in the proper order
     if (DEQ_HEAD(hconn->requests) == &hreq->base)
         _send_request_message(hreq);
 
@@ -1596,13 +1644,13 @@ static void _server_request_free(_server_request_t *hreq)
 {
     if (hreq) {
         qdr_http1_request_base_cleanup(&hreq->base);
+        qdr_http1_out_data_fifo_cleanup(&hreq->out_data);
         if (hreq->request_dlv) {
             qdr_delivery_set_context(hreq->request_dlv, 0);
             qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 server releasing request delivery");
+            hreq->request_dlv = 0;
         }
 
-        qdr_http1_out_data_fifo_cleanup(&hreq->out_data);
-
         _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
         while (rmsg) {
             _server_response_msg_free(hreq, rmsg);
diff --git a/tests/http1_tests.py b/tests/http1_tests.py
index aece59a..e74b6f1 100644
--- a/tests/http1_tests.py
+++ b/tests/http1_tests.py
@@ -1200,3 +1200,159 @@ class Http1Edge2EdgeTestBase(TestCase):
         cls.http_listener11_port = cls.tester.get_port()
         cls.http_server10_port = cls.tester.get_port()
         cls.http_listener10_port = cls.tester.get_port()
+
+
+class Http1ClientCloseTestsMixIn(object):
+    """
+    Generic test functions for simulating HTTP/1.x client connection drops.
+    """
+    def client_request_close_test(self, server_port, client_port, server_mgmt):
+        """
+        Simulate an HTTP client drop while sending a very large PUT request
+        """
+        PING = {
+            "GET": [
+                (RequestMsg("GET", "/GET/test_04_client_request_close/ping",
+                            headers={"Content-Length": "0"}),
+                 ResponseMsg(200, reason="OK",
+                             headers={
+                                 "Content-Length": "19",
+                                 "Content-Type": "text/plain;charset=utf-8",
+                             },
+                             body=b'END OF TRANSMISSION'),
+                 ResponseValidator(status=200)
+                 )]
+        }
+
+        TESTS = {
+            "PUT": [
+                (RequestMsg("PUT", "/PUT/test_04_client_request_close",
+                            headers={
+                                "Content-Length": "500000",
+                                "Content-Type": "text/plain;charset=utf-8"
+                            },
+                            body=b'4' * (500000 - 19) + b'END OF TRANSMISSION'),
+                 ResponseMsg(201, reason="Created",
+                             headers={"Test-Header": "/PUT/test_04_client_request_close",
+                                      "Content-Length": "0"}),
+                 ResponseValidator(status=201)
+                 )]
+        }
+        TESTS.update(PING)
+
+        server = TestServer(server_port=server_port,
+                            client_port=client_port,
+                            tests=TESTS)
+        #
+        # ensure the server has fully connected
+        #
+        client = ThreadedTestClient(PING, client_port)
+        client.wait()
+
+        #
+        # Simulate an HTTP client that dies during the sending of the PUT
+        # request
+        #
+
+        fake_request = b'PUT /PUT/test_04_client_request_close HTTP/1.1\r\n' \
+            + b'Content-Length: 500000\r\n' \
+            + b'Content-Type: text/plain;charset=utf-8\r\n' \
+            + b'\r\n' \
+            + b'?' * 50000
+        fake_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        fake_client.settimeout(5)
+        fake_client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+        fake_client.connect(("127.0.0.1", client_port))
+        fake_client.sendall(fake_request, socket.MSG_WAITALL)
+        fake_client.close()
+
+        # since socket I/O is asynchronous wait until the request arrives
+        # at the server
+
+        expected = len(fake_request)
+        bytes_in = 0
+        while expected > bytes_in:
+            ri = server_mgmt.query(type="org.apache.qpid.dispatch.httpRequestInfo").get_entities()
+            bytes_in = ri[-1]['bytesIn'] if ri else 0  # most recent request at tail
+            sleep(0.1)
+
+        # now ensure the connection between the router and the HTTP server
+        # still functions:
+        client = ThreadedTestClient(PING, client_port)
+        client.wait()
+        server.wait()
+
+    def client_response_close_test(self, server_port, client_port):
+        """
+        Simulate an HTTP client drop while the server is sending a very large
+        response message.
+        """
+        PING = {
+            "PUT": [
+                (RequestMsg("PUT", "/PUT/test_05_client_response_close/ping",
+                            headers={"Content-Length": "1",
+                                     "content-type":
+                                     "text/plain;charset=utf-8"},
+                            body=b'X'),
+                 ResponseMsg(201, reason="Created",
+                             headers={"Content-Length": "0"}),
+                 ResponseValidator(status=201)
+                 )]
+        }
+
+        big_headers = dict([('Huge%s' % i, chr(ord(b'0') + i) * 8000)
+                            for i in range(10)])
+
+        TESTS = {
+            "GET": [
+                (RequestMsg("GET", "/GET/test_05_client_response_close",
+                            headers={
+                                "Content-Length": "0",
+                                "Content-Type": "text/plain;charset=utf-8"
+                            }),
+                 [ResponseMsg(100, reason="Continue", headers=big_headers),
+                  ResponseMsg(100, reason="Continue", headers=big_headers),
+                  ResponseMsg(100, reason="Continue", headers=big_headers),
+                  ResponseMsg(100, reason="Continue", headers=big_headers),
+                  ResponseMsg(200,
+                              reason="OK",
+                              headers={"Content-Length": 1000000,
+                                       "Content-Type": "text/plain;charset=utf-8"},
+                              body=b'?' * 1000000)],
+                 ResponseValidator(status=200)
+                 )]
+        }
+        TESTS.update(PING)
+
+        server = TestServer(server_port=server_port,
+                            client_port=client_port,
+                            tests=TESTS)
+        #
+        # ensure the server has fully connected
+        #
+        client = ThreadedTestClient(PING, client_port)
+        client.wait()
+
+        #
+        # Simulate an HTTP client that dies during the receipt of the
+        # response
+        #
+
+        fake_request = b'GET /GET/test_05_client_response_close HTTP/1.1\r\n' \
+            + b'Content-Length: 0\r\n' \
+            + b'Content-Type: text/plain;charset=utf-8\r\n' \
+            + b'\r\n'
+        fake_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        fake_client.settimeout(TIMEOUT)
+        fake_client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+        fake_client.connect(("127.0.0.1", client_port))
+        fake_client.sendall(fake_request, socket.MSG_WAITALL)
+        fake_client.recv(1)
+        fake_client.close()
+
+        #
+        # Verify the server is still reachable
+        #
+        client = ThreadedTestClient(PING, client_port)
+        client.wait()
+        server.wait()
diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py
index e4472a4..c9672aa 100644
--- a/tests/system_tests_http1_adaptor.py
+++ b/tests/system_tests_http1_adaptor.py
@@ -47,6 +47,7 @@ from http1_tests import ThreadedTestClient, Http1OneRouterTestBase
 from http1_tests import CommonHttp1OneRouterTest
 from http1_tests import CommonHttp1Edge2EdgeTest
 from http1_tests import Http1Edge2EdgeTestBase
+from http1_tests import Http1ClientCloseTestsMixIn
 
 
 class Http1AdaptorManagementTest(TestCase):
@@ -254,7 +255,9 @@ class Http1AdaptorOneRouterTest(Http1OneRouterTestBase,
             assert_approximately_equal(stats[1].get('bytesIn'), 8830)
 
 
-class Http1AdaptorEdge2EdgeTest(Http1Edge2EdgeTestBase, CommonHttp1Edge2EdgeTest):
+class Http1AdaptorEdge2EdgeTest(Http1Edge2EdgeTestBase,
+                                CommonHttp1Edge2EdgeTest,
+                                Http1ClientCloseTestsMixIn):
     """
     Test an HTTP servers and clients attached to edge routers separated by an
     interior router
@@ -313,6 +316,21 @@ class Http1AdaptorEdge2EdgeTest(Http1Edge2EdgeTestBase, CommonHttp1Edge2EdgeTest
         cls.INT_A.wait_address('EA1')
         cls.INT_A.wait_address('EA2')
 
+    def test_1001_client_request_close(self):
+        """
+        Simulate an HTTP client drop while sending a very large PUT
+        """
+        self.client_request_close_test(self.http_server11_port,
+                                       self.http_listener11_port,
+                                       self.EA2.management)
+
+    def test_1002_client_response_close(self):
+        """
+        Simulate an HTTP client drop while server sends very large response
+        """
+        self.client_response_close_test(self.http_server11_port,
+                                        self.http_listener11_port)
+
 
 class FakeHttpServerBase(object):
     """
@@ -353,7 +371,8 @@ class FakeHttpServerBase(object):
         sleep(0.5)  # fudge factor allow socket close to complete
 
 
-class Http1AdaptorBadEndpointsTest(TestCase):
+class Http1AdaptorBadEndpointsTest(TestCase,
+                                   Http1ClientCloseTestsMixIn):
     """
     Subject the router to mis-behaving HTTP endpoints.
     """
@@ -498,7 +517,8 @@ class Http1AdaptorBadEndpointsTest(TestCase):
 
         body_filler = "?" * 1024 * 300  # Q2
 
-        # fake server
+        # fake server - just to create a sink for the "fakeServer" address so
+        # credit will be granted.
         rx = AsyncTestReceiver(self.INT_A.listener,
                                source="fakeServer")
 
@@ -574,6 +594,21 @@ class Http1AdaptorBadEndpointsTest(TestCase):
         self.assertIsNone(error)
         self.assertEqual(1, count)
 
+    def test_04_client_request_close(self):
+        """
+        Simulate an HTTP client drop while sending a very large PUT
+        """
+        self.client_request_close_test(self.http_server_port,
+                                       self.http_listener_port,
+                                       self.INT_A.management)
+
+    def test_05_client_response_close(self):
+        """
+        Simulate an HTTP client drop while server sends very large response
+        """
+        self.client_response_close_test(self.http_server_port,
+                                        self.http_listener_port)
+
 
 class Http1AdaptorQ2Standalone(TestCase):
     """

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org