You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/09/30 19:24:03 UTC

[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1744: HTTP1 protocol adaptor (WIP)

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
     new 1c054ad  DISPATCH-1744: HTTP1 protocol adaptor (WIP)
1c054ad is described below

commit 1c054ad1196ccecd6574ad1413a535814d513402
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Thu Aug 13 10:15:07 2020 -0400

    DISPATCH-1744: HTTP1 protocol adaptor (WIP)
    
    This closes #855
---
 include/qpid/dispatch/http1_codec.h               |  229 ++++
 include/qpid/dispatch/http1_lib.h                 |  118 --
 python/qpid_dispatch/management/qdrouter.json     |    1 +
 src/CMakeLists.txt                                |    5 +-
 src/adaptors/adaptor_utils.c                      |  147 +++
 src/adaptors/adaptor_utils.h                      |   54 +
 src/adaptors/http1/http1_adaptor.c                |  707 +++++++++-
 src/adaptors/http1/http1_client.c                 | 1306 +++++++++++++++++++
 src/adaptors/http1/{http1_lib.c => http1_codec.c} |  634 +++++----
 src/adaptors/http1/http1_private.h                |  272 ++++
 src/adaptors/http1/http1_server.c                 | 1430 +++++++++++++++++++++
 src/adaptors/http_common.c                        |    6 +-
 src/adaptors/http_common.h                        |    3 +-
 src/message.c                                     |    2 +-
 tests/CMakeLists.txt                              |    1 +
 tests/system_tests_http1_adaptor.py               |  722 +++++++++++
 16 files changed, 5240 insertions(+), 397 deletions(-)

diff --git a/include/qpid/dispatch/http1_codec.h b/include/qpid/dispatch/http1_codec.h
new file mode 100644
index 0000000..3682dd1
--- /dev/null
+++ b/include/qpid/dispatch/http1_codec.h
@@ -0,0 +1,229 @@
+#ifndef http1_codec_H
+#define http1_codec_H 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/dispatch/buffer.h>
+#include <qpid/dispatch/message.h>
+
+#include <inttypes.h>
+#include <stdbool.h>
+
+
+// HTTP/1.x Encoder/Decoder Library
+//
+// This library provides an API for encoding and decoding HTTP/1.x messages.
+//
+// The decoder takes qd_buffer_t chains containing HTTP/1.x data read from the
+// TCP connection and issues callbacks as various parts (headers, body, status)
+// of the HTTP/1.x message are parsed.
+//
+// The encoder allows the application to construct an HTTP/1.x message. An API
+// is provided for building the message and callbacks are invoked when the
+// encoder has full qd_buffer_t or body_data to send out the TCP connection.
+//
+// This library provides two classes:
+//
+// * h1_codec_connection_t - a context for a single TCP connection over which
+//   HTTP/1.x messages are exchanged.
+//
+// * h1_codec_request_state_t - a context which tracks the state of a single
+//   HTTP/1.x Request <-> Response message exchange. Multiple
+//   h1_codec_request_state_t can be associated with an h1_codec_connection_t due to
+//   request pipelining.
+//
+
+
+#define HTTP1_VERSION_1_1  "HTTP/1.1"
+#define HTTP1_VERSION_1_0  "HTTP/1.0"
+
+typedef struct h1_codec_connection_t    h1_codec_connection_t;
+typedef struct h1_codec_request_state_t h1_codec_request_state_t;
+
+
+typedef enum {
+    HTTP1_CONN_CLIENT,  // connection initiated by client
+    HTTP1_CONN_SERVER,  // connection to server
+} h1_codec_connection_type_t;
+
+
+typedef enum {
+    HTTP1_STATUS_BAD_REQ = 400,
+    HTTP1_STATUS_SERVER_ERR = 500,
+    HTTP1_STATUS_BAD_VERSION = 505,
+    HTTP1_STATUS_SERVICE_UNAVAILABLE = 503,
+} h1_codec_status_code_t;
+
+
+typedef struct h1_codec_config_t {
+
+    h1_codec_connection_type_t type;
+
+    // Callbacks to send data out the raw connection.  These callbacks are
+    // triggered by the message creation API (h1_codec_tx_*) Note well: these
+    // callbacks are called in the order in which the data must be written out
+    // the raw connection!
+
+    // tx_buffers()
+    // Send a list of buffers containing encoded HTTP message data. The caller
+    // assumes ownership of the buffer list and must release the buffers when
+    // done.  len is set to the total octets of data in the list.
+    //
+    void (*tx_buffers)(h1_codec_request_state_t *hrs, qd_buffer_list_t *data, unsigned int len);
+
+    // tx_body_data()
+    // Called with body_data containing encoded HTTP message data. Only
+    // called if the outgoing HTTP message has a body. The caller assumes
+    // ownership of the body_data and must release it when done.
+    //
+    void (*tx_body_data)(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data);
+
+    //
+    // RX message callbacks
+    //
+    // These callbacks should return 0 on success or non-zero on error.  A
+    // non-zero return code is used as the return code from
+    // h1_codec_connection_rx_data()
+    //
+
+    // HTTP request received - new h1_codec_request_state_t created (hrs).  This
+    // hrs must be supplied in the h1_codec_tx_response() method when sending the
+    // response.
+    int (*rx_request)(h1_codec_request_state_t *hrs,
+                      const char *method,
+                      const char *target,
+                      uint32_t   version_major,
+                      uint32_t   version_minor);
+
+    // HTTP response received - the h1_codec_request_state_t comes from the return
+    // value of the h1_codec_tx_request() method used to create the corresponding
+    // request.  Note well that if status_code is Informational (1xx) then this
+    // response is NOT the last response for the current request (See RFC7231,
+    // 6.2 Informational 1xx).  The request_done callback will be called after
+    // the LAST response has been received.
+    //
+    int (*rx_response)(h1_codec_request_state_t *hrs,
+                       int status_code,
+                       const char *reason_phrase,
+                       uint32_t version_major,
+                       uint32_t version_minor);
+
+    int (*rx_header)(h1_codec_request_state_t *hrs, const char *key, const char *value);
+    int (*rx_headers_done)(h1_codec_request_state_t *hrs, bool has_body);
+
+    int (*rx_body)(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t offset, uintmax_t len, bool more);
+
+    // Invoked after a received HTTP message has been completely parsed.
+    //
+    void (*rx_done)(h1_codec_request_state_t *hrs);
+
+    // Invoked when the final response message has been decoded (server
+    // connection) or encoded (client connection), or the request has been cancelled.
+    // hrs is freed on return from this callback and must not be referenced further.
+    void (*request_complete)(h1_codec_request_state_t *hrs,
+                             bool cancelled);
+
+} h1_codec_config_t;
+
+
+// create a new connection and assign it a context
+//
+h1_codec_connection_t *h1_codec_connection(h1_codec_config_t *config, void *context);
+void *h1_codec_connection_get_context(h1_codec_connection_t *conn);
+
+// Notify the codec that the endpoint closed the connection.  This should be
+// called for server connections only. Once the server has reconnected it is
+// safe to resume calling h1_codec_connection_rx_data().  This method is a
+// no-op for client connections.  When a client connection closes the
+// application must cancel all outstanding requests and then call
+// h1_codec_connection_free() instead.
+//
+void h1_codec_connection_closed(h1_codec_connection_t *conn);
+
+// Release the codec.  This can only be done after all outstanding requests
+// have been completed or cancelled.
+//
+void h1_codec_connection_free(h1_codec_connection_t *conn);
+
+// Push inbound network data into the http1 library. All rx_*() callbacks occur
+// during this call.  The return value is zero on success.  If a non-zero value
+// is returned the codec state is unknown - the application must cancel all
+// outstanding requests and destroy the conn by calling
+// h1_codec_connection_free().
+//
+int h1_codec_connection_rx_data(h1_codec_connection_t *conn, qd_buffer_list_t *data, uintmax_t len);
+
+void h1_codec_request_state_set_context(h1_codec_request_state_t *hrs, void *context);
+void *h1_codec_request_state_get_context(const h1_codec_request_state_t *hrs);
+h1_codec_connection_t *h1_codec_request_state_get_connection(const h1_codec_request_state_t *hrs);
+
+// Cancel the request.  The h1_codec_request_state_t is freed during this call.
+// The request_complete callback will be invoked during this call with
+// cancelled=True.
+//
+void h1_codec_request_state_cancel(h1_codec_request_state_t *hrs);
+
+const char *h1_codec_request_state_method(const h1_codec_request_state_t *hrs);
+
+// true when codec has encoded/decoded a complete request message
+bool h1_codec_request_complete(const h1_codec_request_state_t *hrs);
+
+// true when codec has encoded/decoded a complete response message
+bool h1_codec_response_complete(const h1_codec_request_state_t *hrs);
+
+//
+// API for sending HTTP/1.x messages
+//
+// The tx_msg_headers and tx_msg_body callbacks can occur during any of these
+// calls.
+//
+
+
+// initiate a request - this creates a new request state context
+//
+h1_codec_request_state_t *h1_codec_tx_request(h1_codec_connection_t *conn, const char *method, const char *target,
+                                              uint32_t version_major, uint32_t version_minor);
+
+// Respond to a received request - the request state context should be the one
+// supplied during the corresponding rx_request callback.  It is required that
+// the caller issues responses in the same order as requests arrive.
+//
+int h1_codec_tx_response(h1_codec_request_state_t *hrs, int status_code, const char *reason_phrase,
+                         uint32_t version_major, uint32_t version_minor);
+
+// add header to outgoing message
+//
+int h1_codec_tx_add_header(h1_codec_request_state_t *hrs, const char *key, const char *value);
+
+// Stream outgoing body data.  Ownership of body_data is passed to the caller.
+//
+int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data);
+
+// outgoing message construction complete.  The request_complete() callback MAY
+// occur during this call.
+//
+// need_close: set to true if the message is a response that does not provide
+// an explict body length. If true it is up to the caller to close the
+// underlying socket connection after all outgoing data for this request has
+// been sent.
+//
+int h1_codec_tx_done(h1_codec_request_state_t *hrs, bool *need_close);
+
+
+#endif // http1_codec_H
diff --git a/include/qpid/dispatch/http1_lib.h b/include/qpid/dispatch/http1_lib.h
deleted file mode 100644
index 58385e0..0000000
--- a/include/qpid/dispatch/http1_lib.h
+++ /dev/null
@@ -1,118 +0,0 @@
-#ifndef http1_lib_H
-#define http1_lib_H 1
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <qpid/dispatch/buffer.h>
-
-#include <inttypes.h>
-
-#define HTTP1_VERSION_1_1  "HTTP/1.1"
-#define HTTP1_VERSION_1_0  "HTTP/1.0"
-
-typedef struct http1_conn_t     http1_conn_t;
-typedef struct http1_transfer_t http1_transfer_t;
-
-
-typedef enum {
-    HTTP1_CONN_CLIENT,  // connection initiated by client
-    HTTP1_CONN_SERVER,  // connection to server
-} http1_conn_type_t;
-
-typedef enum {
-    HTTP1_STATUS_BAD_REQ = 400,
-    HTTP1_STATUS_SERVER_ERR = 500,
-    HTTP1_STATUS_BAD_VERSION = 505,
-} http1_status_code_t;
-
-typedef struct http1_conn_config_t {
-
-    http1_conn_type_t type;
-
-    // called with output data to write to the network
-    void (*conn_tx_data)(http1_conn_t *conn, qd_buffer_list_t *data, size_t offset, unsigned int len);
-
-    // @TODO(kgiusti) - remove?
-    //void (*conn_error)(http1_conn_t *conn, int code, const char *reason);
-
-    //
-    // RX message callbacks
-    //
-
-    // HTTP request received - new transfer created (xfer).  This xfer must be
-    // supplied in the http1_response() method
-    int (*xfer_rx_request)(http1_transfer_t *xfer,
-                           const char *method,
-                           const char *target,
-                           const char *version);
-
-    // HTTP response received - the transfer comes from the return value of the
-    // corresponding http1_request method.  Note well that if status_code is
-    // Informational (1xx) then this response is NOT the last response for the
-    // current request (See RFC7231, 6.2 Informational 1xx).  The xfer_done
-    // callback will be called after the LAST response has been received.
-    //
-    int (*xfer_rx_response)(http1_transfer_t *xfer,
-                            const char *version,
-                            int status_code,
-                            const char *reason_phrase);
-
-    int (*xfer_rx_header)(http1_transfer_t *xfer, const char *key, const char *value);
-    int (*xfer_rx_headers_done)(http1_transfer_t *xfer);
-
-    int (*xfer_rx_body)(http1_transfer_t *xfer, qd_buffer_list_t *body, size_t offset, size_t len);
-
-    void (*xfer_rx_done)(http1_transfer_t *xfer);
-
-    // Invoked when the request/response(s) exchange has completed
-    //
-    void (*xfer_done)(http1_transfer_t *xfer);
-} http1_conn_config_t;
-
-
-http1_conn_t *http1_connection(http1_conn_config_t *config, void *context);
-void http1_connection_close(http1_conn_t *conn);
-void *http1_connection_get_context(http1_conn_t *conn);
-
-// push inbound network data into the http1 library
-int http1_connection_rx_data(http1_conn_t *conn, qd_buffer_list_t *data, size_t len);
-
-
-//
-// API for sending HTTP/1.1 messages
-//
-void http1_transfer_set_context(http1_transfer_t *xfer, void *context);
-void *http1_transfer_get_context(const http1_transfer_t *xfer);
-http1_conn_t *http1_transfer_get_connection(const http1_transfer_t *xfer);
-
-// initiate a request - this creates a new message transfer context
-http1_transfer_t *http1_tx_request(http1_conn_t *conn, const char *method, const char *target, const char *version);
-
-// respond to a received request - the transfer context should be from the corresponding xfer_rx_request callback
-int http1_tx_response(http1_transfer_t *xfer, const char *version, int status_code, const char *reason_phrase);
-
-int http1_tx_add_header(http1_transfer_t *xfer, const char *key, const char *value);
-int http1_tx_body(http1_transfer_t *xfer, qd_buffer_list_t *data, size_t offset, size_t len);
-int http1_tx_done(http1_transfer_t *xfer);
-
-
-
-
-
-#endif // http1_lib_H
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index c9deff4..2cf54a4 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1135,6 +1135,7 @@
                 "host": {
                     "description":"IP address: ipv4 or ipv6 literal or a host name",
                     "type": "string",
+                    "default": "127.0.0.1",
                     "create": true
                 },
                 "port": {
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 92b0c5f..103cbaa 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -39,10 +39,13 @@ add_custom_command (
 # Build the qpid-dispatch library.
 set(qpid_dispatch_SOURCES
   adaptors/reference_adaptor.c
+  adaptors/adaptor_utils.c
   adaptors/http_common.c
   adaptors/http2/http2_adaptor.c
-  adaptors/http1/http1_lib.c
+  adaptors/http1/http1_codec.c
   adaptors/http1/http1_adaptor.c
+  adaptors/http1/http1_client.c
+  adaptors/http1/http1_server.c
   adaptors/tcp_adaptor.c
   alloc_pool.c
   amqp.c
diff --git a/src/adaptors/adaptor_utils.c b/src/adaptors/adaptor_utils.c
new file mode 100644
index 0000000..562f488
--- /dev/null
+++ b/src/adaptors/adaptor_utils.c
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "adaptor_utils.h"
+
+#include <proton/netaddr.h>
+#include <string.h>
+
+
+#define RAW_BUFFER_BATCH  16
+
+
+char *qda_raw_conn_get_address(pn_raw_connection_t *socket)
+{
+    const pn_netaddr_t *netaddr = pn_raw_connection_remote_addr(socket);
+    char buffer[1024];
+    int len = pn_netaddr_str(netaddr, buffer, 1024);
+    if (len <= 1024) {
+        return strdup(buffer);
+    } else {
+        return strndup(buffer, 1024);
+    }
+}
+
+
+void qda_raw_conn_get_read_buffers(pn_raw_connection_t *conn, qd_buffer_list_t *blist, uintmax_t *length)
+{
+    pn_raw_buffer_t buffs[RAW_BUFFER_BATCH];
+
+    DEQ_INIT(*blist);
+    uintmax_t len = 0;
+    size_t n;
+    while ((n = pn_raw_connection_take_read_buffers(conn, buffs, RAW_BUFFER_BATCH)) != 0) {
+        for (size_t i = 0; i < n; ++i) {
+            qd_buffer_t *qd_buf = (qd_buffer_t*)buffs[i].context;
+            assert(qd_buf);
+            if (buffs[i].size) {
+                // set content length:
+                qd_buffer_insert(qd_buf, buffs[i].size);
+                len += buffs[i].size;
+                DEQ_INSERT_TAIL(*blist, qd_buf);
+            } else {  // ignore empty buffers
+                qd_buffer_free(qd_buf);
+            }
+        }
+    }
+
+    *length = len;
+}
+
+
+int qda_raw_conn_grant_read_buffers(pn_raw_connection_t *conn)
+{
+    pn_raw_buffer_t buffs[RAW_BUFFER_BATCH];
+    if (pn_raw_connection_is_read_closed(conn))
+        return 0;
+
+    int granted = 0;
+    size_t count = pn_raw_connection_read_buffers_capacity(conn);
+    while (count) {
+        size_t batch_ct = 0;
+        for (int i = 0; i < RAW_BUFFER_BATCH; ++i) {
+            qd_buffer_t *buf = qd_buffer();
+            buffs[i].context  = (intptr_t)buf;
+            buffs[i].bytes    = (char*) qd_buffer_base(buf);
+            buffs[i].capacity = qd_buffer_capacity(buf);
+            buffs[i].size     = 0;
+            buffs[i].offset   = 0;
+            batch_ct += 1;
+            count -= 1;
+            if (count == 0)
+                break;
+        }
+        pn_raw_connection_give_read_buffers(conn, buffs, batch_ct);
+        granted += batch_ct;
+    }
+
+    return granted;
+}
+
+
+int qda_raw_conn_write_buffers(pn_raw_connection_t *conn, qd_buffer_list_t *blist, size_t offset)
+{
+    pn_raw_buffer_t buffs[RAW_BUFFER_BATCH];
+    size_t count = pn_raw_connection_write_buffers_capacity(conn);
+    count = MIN(count, DEQ_SIZE(*blist));
+    int sent = 0;
+
+    // Since count is set to ensure that we never run out of capacity or
+    // buffers to send we can avoid checking that on every loop
+
+    while (count) {
+        size_t batch_ct = 0;
+
+        for (int i = 0; i < RAW_BUFFER_BATCH; ++i) {
+            qd_buffer_t *buf = DEQ_HEAD(*blist);
+            DEQ_REMOVE_HEAD(*blist);
+
+            buffs[i].context  = (intptr_t)buf;
+            buffs[i].bytes    = (char*)qd_buffer_base(buf);
+            buffs[i].capacity = 0;
+            buffs[i].size     = qd_buffer_size(buf) - offset;
+            buffs[i].offset   = offset;
+            offset = 0; // all succeeding bufs start at base
+
+            batch_ct += 1;
+            count -= 1;
+            if (count == 0)
+                break;
+        }
+        pn_raw_connection_write_buffers(conn, buffs, batch_ct);
+        sent += batch_ct;
+    }
+
+    return sent;
+}
+
+
+void qda_raw_conn_free_write_buffers(pn_raw_connection_t *conn)
+{
+    pn_raw_buffer_t buffs[RAW_BUFFER_BATCH];
+    size_t n;
+    while ((n = pn_raw_connection_take_written_buffers(conn, buffs, RAW_BUFFER_BATCH)) != 0) {
+        for (size_t i = 0; i < n; ++i) {
+            qd_buffer_t *qd_buf = (qd_buffer_t*)buffs[i].context;
+            assert(qd_buf);
+            qd_buffer_free(qd_buf);
+        }
+    }
+}
+
diff --git a/src/adaptors/adaptor_utils.h b/src/adaptors/adaptor_utils.h
new file mode 100644
index 0000000..eb1bed0
--- /dev/null
+++ b/src/adaptors/adaptor_utils.h
@@ -0,0 +1,54 @@
+#ifndef __adaptor_utils_h__
+#define __adaptor_utils_h__
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/buffer.h>
+#include <proton/raw_connection.h>
+
+// Get the raw connections remote address
+// Caller must free() the result when done.
+//
+char *qda_raw_conn_get_address(pn_raw_connection_t *socket);
+
+// Retrieve all available incoming data buffers from the raw connection.
+// Return the result in blist, with the total number of read octets in *length
+// Note: only those buffers containing data (size != 0) are returned.
+//
+void qda_raw_conn_get_read_buffers(pn_raw_connection_t *conn, qd_buffer_list_t *blist, uintmax_t *length);
+
+// allocate empty read buffers to the raw connection.  This will provide enough buffers
+// to fill the connections capacity.  Returns the number of buffers granted.
+//
+int qda_raw_conn_grant_read_buffers(pn_raw_connection_t *conn);
+
+
+// Write blist buffers to the connection.  Buffers are removed from the HEAD of
+// blist.  Returns the actual number of buffers taken.  offset is an optional
+// offset to the first outgoing octet in the HEAD buffer.
+//
+int qda_raw_conn_write_buffers(pn_raw_connection_t *conn, qd_buffer_list_t *blist, size_t offset);
+
+
+// release all sent buffers held by the connection
+//
+void qda_raw_conn_free_write_buffers(pn_raw_connection_t *conn);
+
+
+#endif // __adaptor_utils_h__
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index beb9eb4..d9d4e4c 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -17,40 +17,709 @@
  * under the License.
  */
 
-#include <qpid/dispatch/http1_lib.h>
-#include <qpid/dispatch/protocol_adaptor.h>
-#include <qpid/dispatch/message.h>
-#include "adaptors/http_common.h"
+#include "http1_private.h"
 
 #include <stdio.h>
 #include <inttypes.h>
 
 
-typedef struct qd_http1_adaptor_t {
-    qdr_core_t               *core;
-    qdr_protocol_adaptor_t   *adaptor;
-    qd_http_lsnr_list_t       listeners;
-    qd_http_connector_list_t  connectors;
-    qd_log_source_t          *log;
-} qd_http1_adaptor_t;
+//
+// This file contains code for the HTTP/1.x protocol adaptor.  This file
+// includes code that is common to both ends of the protocol (e.g. client and
+// server processing).  See http1_client.c and http1_server.c for code specific
+// to HTTP endpoint processing.
+//
 
-//static qd_http1_adaptor_t *http1_adaptor;
+// for debug: will dump raw buffer content to stdout if true
+#define HTTP1_DUMP_BUFFERS false
 
-#define BUFF_BATCH 16
+#define RAW_BUFFER_BATCH  16
 
 
-// dummy for now:
-qd_http_lsnr_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
+/*
+  HTTP/1.x <--> AMQP message mapping
+
+  Message Properties Section:
+
+  HTTP Message                  AMQP Message Properties
+  ------------                  -----------------------
+  Request Method                subject field
+
+  Application Properties Section:
+
+  HTTP Message                  AMQP Message App Properties Map
+  ------------                  -------------------------------
+  Request Version               "http:request": "<version|1.1 default>"
+  Response Version              "http:response": "<version|1.1 default>"
+  Response Status Code          "http:status": <int32>
+  Response Reason               "http:reason": <string>
+  Request Target                "http:target": <string>
+  *                             "<lowercase(key)>" <string>
+
+  Notes:
+   - Message App Properties Keys that start with "http:" are reserved by the
+     adaptor for meta-data.
+ */
+
+// @TODO(kgiusti): rx complete + abort ingress deliveries when endpoint dies while msg in flight
+
+
+ALLOC_DEFINE(qdr_http1_out_data_t);
+ALLOC_DEFINE(qdr_http1_connection_t);
+
+
+qdr_http1_adaptor_t *qdr_http1_adaptor;
+
+
+void qdr_http1_request_base_cleanup(qdr_http1_request_base_t *hreq)
 {
-    return 0;
+    if (hreq) {
+        DEQ_REMOVE(hreq->hconn->requests, hreq);
+        h1_codec_request_state_cancel(hreq->lib_rs);
+        free(hreq->response_addr);
+    }
 }
 
-void qd_http1_delete_listener(qd_dispatch_t *qd, qd_http_lsnr_t *listener) {}
 
-qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
+void qdr_http1_connection_free(qdr_http1_connection_t *hconn)
 {
+    if (hconn) {
+
+        sys_mutex_lock(qdr_http1_adaptor->lock);
+        DEQ_REMOVE(qdr_http1_adaptor->connections, hconn);
+        sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+        // request expected to be clean up by caller
+#if 0  // JIRA ME!
+        assert(DEQ_IS_EMPTY(hconn->requests));
+#endif
+
+        h1_codec_connection_free(hconn->http_conn);
+        if (hconn->raw_conn) {
+            pn_raw_connection_set_context(hconn->raw_conn, 0);
+            pn_raw_connection_close(hconn->raw_conn);
+        }
+#if 0
+        if (hconn->out_link) {
+            qdr_link_set_context(hconn->out_link, 0);
+            qdr_link_detach(hconn->out_link, QD_CLOSED, 0);
+        }
+        if (hconn->in_link) {
+            qdr_link_set_context(hconn->in_link, 0);
+            qdr_link_detach(hconn->in_link, QD_CLOSED, 0);
+        }
+        if (hconn->qdr_conn) {
+            qdr_connection_set_context(hconn->qdr_conn, 0);
+            qdr_connection_closed(hconn->qdr_conn);
+        }
+#endif
+
+        free(hconn->cfg.host);
+        free(hconn->cfg.port);
+        free(hconn->cfg.address);
+        free(hconn->cfg.host_port);
+
+        free(hconn->client.client_ip_addr);
+        free(hconn->client.reply_to_addr);
+
+        qd_timer_free(hconn->server.reconnect_timer);
+
+        free_qdr_http1_connection_t(hconn);
+    }
+}
+
+
+void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data)
+{
+    if (out_data) {
+        // expect: all buffers returned from proton!
+        assert(qdr_http1_out_data_buffers_outstanding(out_data) == 0);
+        qdr_http1_out_data_t *od = DEQ_HEAD(out_data->fifo);
+        while (od) {
+            DEQ_REMOVE_HEAD(out_data->fifo);
+            if (od->body_data)
+                qd_message_body_data_release(od->body_data);
+            else
+                qd_buffer_list_free_buffers(&od->raw_buffers);
+            free_qdr_http1_out_data_t(od);
+            od = DEQ_HEAD(out_data->fifo);
+        }
+    }
+}
+
+
+// Return the number of buffers in the process of being written out by the proactor.
+// These buffers are "owned" by proton - they must not be freed until proton has
+// released them.
+//
+int qdr_http1_out_data_buffers_outstanding(const qdr_http1_out_data_fifo_t *out_data)
+{
+    int count = 0;
+    if (out_data) {
+        qdr_http1_out_data_t *od = DEQ_HEAD(out_data->fifo);
+        while (od) {
+            count += od->next_buffer - od->free_count;
+            if (od == out_data->write_ptr)
+                break;
+
+            od = DEQ_NEXT(od);
+        }
+    }
+    return count;
+}
+
+
+// Initiate close of the raw connection.
+//
+void qdr_http1_close_connection(qdr_http1_connection_t *hconn, const char *error)
+{
+    if (error) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR,
+               "[C%"PRIu64"] Connection closing: %s", hconn->conn_id, error);
+    }
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+           "[C%"PRIu64"] Initiating close of connection", hconn->conn_id);
+
+    if (hconn->raw_conn) {
+        hconn->close_connection = true;
+        pn_raw_connection_close(hconn->raw_conn);
+    }
+
+    // clean up all connection related stuff on PN_RAW_CONNECTION_DISCONNECTED
+    // event
+}
+
+
+void qdr_http1_rejected_response(qdr_http1_request_base_t *hreq,
+                                 const qdr_error_t        *error)
+{
+    char *reason = 0;
+    if (error) {
+        size_t len = 0;
+        char *ename = qdr_error_name(error);
+        char *edesc = qdr_error_description(error);
+        if (ename) len += strlen(ename);
+        if (edesc) len += strlen(edesc);
+        if (len) {
+            reason = qd_malloc(len + 2);
+            reason[0] = 0;
+            if (ename) {
+                strcat(reason, ename);
+                strcat(reason, " ");
+            }
+            if (edesc)
+                strcat(reason, edesc);
+        }
+        free(ename);
+        free(edesc);
+    }
+
+    qdr_http1_error_response(hreq, HTTP1_STATUS_BAD_REQ,
+                             reason ? reason : "Invalid Request");
+    free(reason);
+}
+
+
+// send a server error response
+//
+void qdr_http1_error_response(qdr_http1_request_base_t *hreq,
+                              int error_code,
+                              const char *reason)
+{
+    if (hreq->lib_rs) {
+        bool ignored;
+        h1_codec_tx_response(hreq->lib_rs, error_code, reason, 1, 1);
+        h1_codec_tx_done(hreq->lib_rs, &ignored);
+    }
+}
+
+
+const char *qdr_http1_token_list_next(const char *start, size_t *len, const char **next)
+{
+    static const char *SKIPME = ", \t";
+
+    *len = 0;
+    *next = 0;
+
+    if (!start) return 0;
+
+    while (*start && strchr(SKIPME, *start))
+        ++start;
+
+    if (!*start) return 0;
+
+    const char *end = start;
+    while (*end && !strchr(SKIPME, *end))
+        ++end;
+
+    *len = end - start;
+    *next = end;
+
+    while (**next && strchr(SKIPME, **next))
+        ++(*next);
+
+    return start;
+}
+
+
+//
+// Raw Connection Write Buffer Management
+//
+
+
+// Write pending data out the raw connection.  Preserve order by only writing
+// the head request data.
+//
+uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_fifo_t *fifo)
+{
+    pn_raw_buffer_t buffers[RAW_BUFFER_BATCH];
+    size_t count = !hconn->raw_conn || pn_raw_connection_is_write_closed(hconn->raw_conn)
+        ? 0
+        : pn_raw_connection_write_buffers_capacity(hconn->raw_conn);
+
+    uint64_t total_octets = 0;
+    qdr_http1_out_data_t *od = fifo->write_ptr;
+    while (count > 0 && od) {
+        qd_buffer_t *wbuf   = 0;
+        int          od_len = MIN(count,
+                                  (od->buffer_count - od->next_buffer));
+        assert(od_len);  // error: no data @ head?
+
+        // send the out_data as a series of writes to proactor
+
+        while (od_len) {
+            size_t limit = MIN(RAW_BUFFER_BATCH, od_len);
+            int written = 0;
+
+            if (od->body_data) {  // buffers stored in qd_message_t
+
+                written = qd_message_body_data_buffers(od->body_data, buffers, od->next_buffer, limit);
+                for (int i = 0; i < written; ++i) {
+                    // enforce this: we expect the context can be used by the adaptor!
+                    assert(buffers[i].context == 0);
+                    buffers[i].context = (uintptr_t)od;
+                    total_octets += buffers[i].size;
+                }
+
+            } else {   // list of buffers in od->raw_buffers
+                // advance to next buffer to send in od
+                if (!wbuf) {
+                    wbuf = DEQ_HEAD(od->raw_buffers);
+                    for (int i = 0; i < od->next_buffer; ++i)
+                        wbuf = DEQ_NEXT(wbuf);
+                }
+
+                pn_raw_buffer_t *rdisc = &buffers[0];
+                while (limit--) {
+                    rdisc->context  = (uintptr_t)od;
+                    rdisc->bytes    = (char*) qd_buffer_base(wbuf);
+                    rdisc->capacity = 0;
+                    rdisc->size     = qd_buffer_size(wbuf);
+                    rdisc->offset   = 0;
+
+                    total_octets += rdisc->size;
+                    ++rdisc;
+                    wbuf = DEQ_NEXT(wbuf);
+                    written += 1;
+                }
+            }
+
+            // keep me, you'll need it
+            if (HTTP1_DUMP_BUFFERS) {
+                for (size_t j = 0; j < written; ++j) {
+                    char *ptr = (char*) buffers[j].bytes;
+                    int len = (int) buffers[j].size;
+                    fprintf(stdout, "\n[C%"PRIu64"] Raw Write: Ptr=%p len=%d\n  value='%.*s'\n",
+                            hconn->conn_id, (void*)ptr, len, len, ptr);
+                    fflush(stdout);
+                }
+            }
+
+            written = pn_raw_connection_write_buffers(hconn->raw_conn, buffers, written);
+            count -= written;
+            od_len -= written;
+            od->next_buffer += written;
+        }
+
+        if (od->next_buffer == od->buffer_count) {
+            // all buffers in od have been passed to proton.
+            od = DEQ_NEXT(od);
+            fifo->write_ptr = od;
+            wbuf = 0;
+        }
+    }
+
+    hconn->out_http1_octets += total_octets;
+    return total_octets;
+}
+
+
+// The HTTP encoder has a list of buffers to be written to the raw connection.
+// Queue it to the outgoing data fifo.
+//
+void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_list_t *blist)
+{
+    int count = (int) DEQ_SIZE(*blist);
+    if (count) {
+        qdr_http1_out_data_t *od = new_qdr_http1_out_data_t();
+        ZERO(od);
+        od->owning_fifo = fifo;
+        od->buffer_count = (int) DEQ_SIZE(*blist);
+        od->raw_buffers = *blist;
+        DEQ_INIT(*blist);
+
+        DEQ_INSERT_TAIL(fifo->fifo, od);
+        if (!fifo->write_ptr)
+            fifo->write_ptr = od;
+    }
+}
+
+
+// The HTTP encoder has a message body data to be written to the raw connection.
+// Queue it to the outgoing data fifo.
+//
+void qdr_http1_enqueue_body_data(qdr_http1_out_data_fifo_t *fifo, qd_message_body_data_t *body_data)
+{
+    int count = qd_message_body_data_buffer_count(body_data);
+    if (count) {
+        qdr_http1_out_data_t *od = new_qdr_http1_out_data_t();
+        ZERO(od);
+        od->owning_fifo = fifo;
+        od->body_data = body_data;
+        od->buffer_count = count;
+
+        DEQ_INSERT_TAIL(fifo->fifo, od);
+        if (!fifo->write_ptr)
+            fifo->write_ptr = od;
+    } else {
+        // empty body-data
+        qd_message_body_data_release(body_data);
+    }
+}
+
+
+// Called during proactor event PN_RAW_CONNECTION_WRITTEN
+//
+void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn)
+{
+    pn_raw_buffer_t buffers[RAW_BUFFER_BATCH];
+    size_t count;
+    while ((count = pn_raw_connection_take_written_buffers(hconn->raw_conn, buffers, RAW_BUFFER_BATCH)) != 0) {
+        for (size_t i = 0; i < count; ++i) {
+
+            // keep me, you'll need it
+            if (HTTP1_DUMP_BUFFERS) {
+                char *ptr = (char*) buffers[i].bytes;
+                int len = (int) buffers[i].size;
+                fprintf(stdout, "\n[C%"PRIu64"] Raw Written: Ptr=%p len=%d\n  value='%.*s'\n",
+                        hconn->conn_id, (void*)ptr, len, len, ptr);
+                fflush(stdout);
+            }
+
+            qdr_http1_out_data_t *od = (qdr_http1_out_data_t*) buffers[i].context;
+            assert(od);
+            // Note: according to proton devs the order in which write buffers
+            // are released are NOT guaranteed to be in the same order in which
+            // they were written!
+
+            od->free_count += 1;
+            if (od->free_count == od->buffer_count) {
+                // all buffers returned
+                qdr_http1_out_data_fifo_t *fifo = od->owning_fifo;
+                DEQ_REMOVE(fifo->fifo, od);
+                if (od->body_data)
+                    qd_message_body_data_release(od->body_data);
+                else
+                    qd_buffer_list_free_buffers(&od->raw_buffers);
+                free_qdr_http1_out_data_t(od);
+            }
+        }
+    }
+}
+
+
+//
+// Protocol Adaptor Callbacks
+//
+
+
+// Invoked by the core thread to wake an I/O thread for the connection
+//
+static void _core_connection_activate_CT(void *context, qdr_connection_t *conn)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_connection_get_context(conn);
+    if (!hconn) return;
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] Connection activate", hconn->conn_id);
+
+    if (hconn->raw_conn)
+        pn_raw_connection_wake(hconn->raw_conn);
+    else
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] missing raw connection!", hconn->conn_id);
+}
+
+
+static void _core_link_first_attach(void               *context,
+                                    qdr_connection_t   *conn,
+                                    qdr_link_t         *link,
+                                    qdr_terminus_t     *source,
+                                    qdr_terminus_t     *target,
+                                    qd_session_class_t  ssn_class)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_connection_get_context(conn);
+    if (hconn)
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] Link first attach", hconn->conn_id);
+}
+
+
+static void _core_link_second_attach(void          *context,
+                                     qdr_link_t     *link,
+                                     qdr_terminus_t *source,
+                                     qdr_terminus_t *target)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+    if (!hconn) return;
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+           "[C%"PRIu64"][L%"PRIu64"] Link second attach", hconn->conn_id, link->identity);
+
+    if (hconn->type == HTTP1_CONN_CLIENT) {
+        qdr_http1_client_core_second_attach((qdr_http1_adaptor_t*) context,
+                                            hconn, link, source, target);
+    }
+}
+
+
+static void _core_link_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+    if (hconn) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] Link detach", hconn->conn_id, link->identity);
+
+        qdr_link_set_context(link, 0);
+        if (link == hconn->out_link)
+            hconn->out_link = 0;
+        else
+            hconn->in_link = 0;
+    }
+}
+
+
+static void _core_link_flow(void *context, qdr_link_t *link, int credit)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+    if (hconn) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] Link flow (%d)",
+               hconn->conn_id, link->identity, credit);
+        if (hconn->type == HTTP1_CONN_SERVER)
+            qdr_http1_server_core_link_flow((qdr_http1_adaptor_t*) context, hconn, link, credit);
+        else
+            qdr_http1_client_core_link_flow((qdr_http1_adaptor_t*) context, hconn, link, credit);
+    }
+}
+
+
+static void _core_link_offer(void *context, qdr_link_t *link, int delivery_count)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+    if (hconn) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] Link offer (%d)",
+               hconn->conn_id, link->identity, delivery_count);
+    }
+}
+
+
+static void _core_link_drained(void *context, qdr_link_t *link)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+    if (hconn) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] Link drained",
+               hconn->conn_id, link->identity);
+    }
+}
+
+
+static void _core_link_drain(void *context, qdr_link_t *link, bool mode)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+    if (hconn) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] Link drain %s",
+               hconn->conn_id, link->identity,
+               mode ? "ON" : "OFF");
+    }
+}
+
+
+static int _core_link_push(void *context, qdr_link_t *link, int limit)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+    if (hconn) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] Link push %d", hconn->conn_id, link->identity, limit);
+        return qdr_link_process_deliveries(qdr_http1_adaptor->core, link, limit);
+    }
     return 0;
 }
 
-void qd_http1_delete_connector(qd_dispatch_t *qd, qd_http_connector_t *conn) {}
+
+// The I/O thread wants to send this delivery out the link
+//
+static uint64_t _core_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+    uint64_t outcome = PN_RELEASED;
+
+    if (hconn) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] Core link deliver %p %s", hconn->conn_id, link->identity,
+               (void*)delivery, settled ? "settled" : "unsettled");
+
+        if (hconn->type == HTTP1_CONN_SERVER)
+            outcome = qdr_http1_server_core_link_deliver(qdr_http1_adaptor, hconn, link, delivery, settled);
+        else
+            outcome = qdr_http1_client_core_link_deliver(qdr_http1_adaptor, hconn, link, delivery, settled);
+    }
+
+    return outcome;
+}
+
+static int _core_link_get_credit(void *context, qdr_link_t *link)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+    int credit = 0;
+    if (hconn) {
+        credit = (link == hconn->in_link) ? hconn->in_link_credit : hconn->out_link_credit;
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] Link get credit (%d)", hconn->conn_id, link->identity, credit);
+    }
+
+    return credit;
+}
+
+
+// Handle disposition/settlement update for the outstanding incoming HTTP message.
+//
+static void _core_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
+{
+    qdr_http1_request_base_t *hreq = (qdr_http1_request_base_t*) qdr_delivery_get_context(dlv);
+    if (hreq) {
+        qdr_http1_connection_t *hconn = hreq->hconn;
+        qdr_link_t *link = qdr_delivery_link(dlv);
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] Core Delivery update disp=0x%"PRIx64" %s",
+               hconn->conn_id, link->identity, disp,
+               settled ? "settled" : "unsettled");
+
+        if (hconn->type == HTTP1_CONN_SERVER)
+            qdr_http1_server_core_delivery_update(qdr_http1_adaptor, hconn, hreq, dlv, disp, settled);
+        else
+            qdr_http1_client_core_delivery_update(qdr_http1_adaptor, hconn, hreq, dlv, disp, settled);
+    }
+}
+
+static void _core_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_connection_get_context(conn);
+    if (hconn) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+               "[C%"PRIu64"] HTTP/1.x closing connection", hconn->conn_id);
+
+        char *qdr_error = error ? qdr_error_description(error) : 0;
+        qdr_http1_close_connection(hconn, qdr_error);
+        qdr_connection_set_context(conn, 0);
+        hconn->qdr_conn = 0;
+        hconn->in_link = hconn->out_link = 0;
+        free(qdr_error);
+    }
+}
+
+
+static void _core_conn_trace(void *context, qdr_connection_t *conn, bool trace)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_connection_get_context(conn);
+    if (hconn) {
+        hconn->trace = trace;
+        if (trace)
+            qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+                   "[C%"PRIu64"] HTTP/1.x trace enabled", hconn->conn_id);
+    }
+}
+
+
+//
+// Adaptor Setup & Teardown
+//
+
+
+static void qd_http1_adaptor_init(qdr_core_t *core, void **adaptor_context)
+{
+    qdr_http1_adaptor_t *adaptor = NEW(qdr_http1_adaptor_t);
+
+    ZERO(adaptor);
+    adaptor->core    = core;
+    adaptor->log = qd_log_source(QD_HTTP_LOG_SOURCE);
+    adaptor->lock = sys_mutex();
+    DEQ_INIT(adaptor->listeners);
+    DEQ_INIT(adaptor->connectors);
+    DEQ_INIT(adaptor->connections);
+    adaptor->adaptor = qdr_protocol_adaptor(core,
+                                            "http/1.x",
+                                            adaptor,             // context
+                                            _core_connection_activate_CT,  // core thread only
+                                            _core_link_first_attach,
+                                            _core_link_second_attach,
+                                            _core_link_detach,
+                                            _core_link_flow,
+                                            _core_link_offer,
+                                            _core_link_drained,
+                                            _core_link_drain,
+                                            _core_link_push,
+                                            _core_link_deliver,
+                                            _core_link_get_credit,
+                                            _core_delivery_update,
+                                            _core_conn_close,
+                                            _core_conn_trace);
+    *adaptor_context = adaptor;
+    qdr_http1_adaptor = adaptor;
+}
+
+
+static void qd_http1_adaptor_final(void *adaptor_context)
+{
+    qdr_http1_adaptor_t *adaptor = (qdr_http1_adaptor_t*) adaptor_context;
+    qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
+
+    qd_http_lsnr_t *li = DEQ_HEAD(adaptor->listeners);
+    while (li) {
+        qd_http1_delete_listener(0, li);
+        li = DEQ_HEAD(adaptor->listeners);
+    }
+    qd_http_connector_t *ct = DEQ_HEAD(adaptor->connectors);
+    while (ct) {
+        qd_http1_delete_connector(0, ct);
+        ct = DEQ_HEAD(adaptor->connectors);
+    }
+    qdr_http1_connection_t *hconn = DEQ_HEAD(adaptor->connections);
+    while (hconn) {
+        qdr_http1_connection_free(hconn);
+        hconn = DEQ_HEAD(adaptor->connections);
+    }
+
+    sys_mutex_free(adaptor->lock);
+    qdr_http1_adaptor =  NULL;
+
+    free(adaptor);
+}
+
+
+/**
+ * Declare the adaptor so that it will self-register on process startup.
+ */
+QDR_CORE_ADAPTOR_DECLARE("http1.x-adaptor", qd_http1_adaptor_init, qd_http1_adaptor_final)
 
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
new file mode 100644
index 0000000..0a06787
--- /dev/null
+++ b/src/adaptors/http1/http1_client.c
@@ -0,0 +1,1306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "http1_private.h"
+#include "adaptors/adaptor_utils.h"
+
+#include <proton/listener.h>
+#include <proton/proactor.h>
+
+
+//
+// This file contains code specific to HTTP client processing.  The raw
+// connection is terminated at an HTTP client, not an HTTP server.
+//
+
+#define DEFAULT_CAPACITY 250
+#define LISTENER_BACKLOG  16
+
+
+//
+// State for a single response message to be sent to the client via the raw
+// connection.
+//
+typedef struct _client_response_msg_t {
+    DEQ_LINKS(struct _client_response_msg_t);
+
+    qdr_delivery_t *dlv;              // from core via core_link_deliver
+    uint64_t        dispo;            // set by adaptor on encode complete
+    bool            headers_encoded;  // all headers completely encoded
+    bool            encoded;          // true when full response encoded
+
+    // HTTP encoded message data
+    qdr_http1_out_data_fifo_t out_data;
+
+} _client_response_msg_t;
+ALLOC_DECLARE(_client_response_msg_t);
+ALLOC_DEFINE(_client_response_msg_t);
+DEQ_DECLARE(_client_response_msg_t, _client_response_msg_list_t);
+
+
+//
+// State for an HTTP/1.x Request+Response exchange, client facing
+//
+typedef struct _client_request_t {
+    qdr_http1_request_base_t base;
+
+    // The request arrives via the raw connection.  These fields are used to
+    // build the message and deliver it into the core.
+    //
+    qd_message_t        *request_msg;      // holds inbound message as it is built
+    qdr_delivery_t      *request_dlv;      // qdr_link_deliver()
+    qd_composed_field_t *request_props;    // holds HTTP headers as they arrive
+    uint64_t             request_dispo;    // set by core (core_update_delivery)
+    bool                 request_settled;  // set by core (core_update_delivery)
+
+    // A single request may result in more than one response (1xx Continue for
+    // example).  These responses are written to the raw connection from HEAD
+    // to TAIL.
+    //
+    _client_response_msg_list_t responses;
+
+    bool codec_completed;     // encoder/decoder done
+    bool cancelled;
+    bool close_on_complete;   // close the conn when this request is complete
+
+} _client_request_t;
+ALLOC_DECLARE(_client_request_t);
+ALLOC_DEFINE(_client_request_t);
+
+
+static void _client_tx_buffers_cb(h1_codec_request_state_t *lib_hrs, qd_buffer_list_t *blist, unsigned int len);
+static void _client_tx_body_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_body_data_t *body_data);
+static int _client_rx_request_cb(h1_codec_request_state_t *lib_rs,
+                                 const char *method,
+                                 const char *target,
+                                 uint32_t version_major,
+                                 uint32_t version_minor);
+static int _client_rx_response_cb(h1_codec_request_state_t *lib_rs,
+                                  int status_code,
+                                  const char *reason_phrase,
+                                  uint32_t version_major,
+                                  uint32_t version_minor);
+static int _client_rx_header_cb(h1_codec_request_state_t *lib_rs, const char *key, const char *value);
+static int _client_rx_headers_done_cb(h1_codec_request_state_t *lib_rs, bool has_body);
+static int _client_rx_body_cb(h1_codec_request_state_t *lib_rs, qd_buffer_list_t *body, size_t offset, uintmax_t len,
+                              bool more);
+static void _client_rx_done_cb(h1_codec_request_state_t *lib_rs);
+static void _client_request_complete_cb(h1_codec_request_state_t *lib_rs, bool cancelled);
+static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context);
+static void _client_response_msg_free(_client_request_t *req, _client_response_msg_t *rmsg);
+static void _client_request_free(_client_request_t *req);
+static void _client_connection_free(qdr_http1_connection_t *hconn);
+static void _write_pending_response(_client_request_t *req);
+
+
+////////////////////////////////////////////////////////
+// HTTP/1.x Client Listener
+////////////////////////////////////////////////////////
+
+
+// Listener received connection request from client
+//
+static qdr_http1_connection_t *_create_client_connection(qd_http_lsnr_t *li)
+{
+    qdr_http1_connection_t *hconn = new_qdr_http1_connection_t();
+
+    ZERO(hconn);
+    hconn->type = HTTP1_CONN_CLIENT;
+    hconn->qd_server = li->server;
+    hconn->adaptor = qdr_http1_adaptor;
+    hconn->handler_context.handler = &_handle_connection_events;
+    hconn->handler_context.context = hconn;
+
+    hconn->client.next_msg_id = 99383939;
+
+    // configure the HTTP/1.x library
+
+    h1_codec_config_t config = {0};
+    config.type             = HTTP1_CONN_CLIENT;
+    config.tx_buffers       = _client_tx_buffers_cb;
+    config.tx_body_data     = _client_tx_body_data_cb;
+    config.rx_request       = _client_rx_request_cb;
+    config.rx_response      = _client_rx_response_cb;
+    config.rx_header        = _client_rx_header_cb;
+    config.rx_headers_done  = _client_rx_headers_done_cb;
+    config.rx_body          = _client_rx_body_cb;
+    config.rx_done          = _client_rx_done_cb;
+    config.request_complete = _client_request_complete_cb;
+
+    hconn->http_conn = h1_codec_connection(&config, hconn);
+    if (!hconn->http_conn) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR,
+               "Failed to initialize HTTP/1.x library - connection refused.");
+        qdr_http1_connection_free(hconn);
+        return 0;
+    }
+
+    hconn->cfg.host = qd_strdup(li->config.host);
+    hconn->cfg.port = qd_strdup(li->config.port);
+    hconn->cfg.address = qd_strdup(li->config.address);
+
+    hconn->raw_conn = pn_raw_connection();
+    pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
+
+    sys_mutex_lock(qdr_http1_adaptor->lock);
+    DEQ_INSERT_TAIL(qdr_http1_adaptor->connections, hconn);
+    sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+    // we'll create a QDR connection and links once the raw connection activates
+    return hconn;
+}
+
+
+// Process proactor events for the client listener
+//
+static void _handle_listener_events(pn_event_t *e, qd_server_t *qd_server, void *context)
+{
+    qd_log_source_t *log = qdr_http1_adaptor->log;
+    qd_http_lsnr_t *li = (qd_http_lsnr_t*) context;
+    const char *host_port = li->config.host_port;
+
+    qd_log(log, QD_LOG_DEBUG, "HTTP/1.x Client Listener Event %s\n", pn_event_type_name(pn_event_type(e)));
+
+    switch (pn_event_type(e)) {
+
+    case PN_LISTENER_OPEN: {
+        qd_log(log, QD_LOG_NOTICE, "Listening for HTTP/1.x client requests on %s", host_port);
+        break;
+    }
+
+    case PN_LISTENER_ACCEPT: {
+        qd_log(log, QD_LOG_INFO, "Accepting HTTP/1.x connection on %s", host_port);
+        qdr_http1_connection_t *hconn = _create_client_connection(li);
+        if (hconn) {
+            // Note: the proactor may schedule the hconn on another thread
+            // during this call!
+            pn_listener_raw_accept(li->pn_listener, hconn->raw_conn);
+        }
+        break;
+    }
+
+    case PN_LISTENER_CLOSE: {
+        if (li->pn_listener) {
+            pn_condition_t *cond = pn_listener_condition(li->pn_listener);
+            if (pn_condition_is_set(cond)) {
+                qd_log(log, QD_LOG_ERROR, "Listener error on %s: %s (%s)", host_port,
+                       pn_condition_get_description(cond),
+                       pn_condition_get_name(cond));
+            } else {
+                qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port);
+            }
+            pn_listener_set_context(li->pn_listener, 0);
+            li->pn_listener = 0;
+        }
+        break;
+    }
+
+    default:
+        break;
+    }
+}
+
+
+// Management Agent API - Create
+//
+qd_http_lsnr_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
+{
+    qd_http_lsnr_t *li = qd_http_lsnr(qd->server, &_handle_listener_events);
+    if (!li) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR, "Unable to create http listener: no memory");
+        return 0;
+    }
+    li->config = *config;
+
+    DEQ_ITEM_INIT(li);
+
+    sys_mutex_lock(qdr_http1_adaptor->lock);
+    DEQ_INSERT_TAIL(qdr_http1_adaptor->listeners, li);
+    sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Configured HTTP_ADAPTOR listener on %s", (&li->config)->host_port);
+    // Note: the proactor may schedule the pn_listener on another thread during this call
+    pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, li->config.host_port, LISTENER_BACKLOG);
+    return li;
+}
+
+
+// Management Agent API - Delete
+//
+void qd_http1_delete_listener(qd_dispatch_t *ignore, qd_http_lsnr_t *li)
+{
+    if (li) {
+        if (li->pn_listener) {
+            pn_listener_close(li->pn_listener);
+            li->pn_listener = 0;
+        }
+        sys_mutex_lock(qdr_http1_adaptor->lock);
+        DEQ_REMOVE(qdr_http1_adaptor->listeners, li);
+        sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+        qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Deleted HttpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port);
+        qd_http_listener_decref(li);
+    }
+}
+
+
+////////////////////////////////////////////////////////
+// Raw Connector Events
+////////////////////////////////////////////////////////
+
+
+// Raw Connection Initialization
+//
+static void _setup_client_connection(qdr_http1_connection_t *hconn)
+{
+    hconn->client.client_ip_addr = qda_raw_conn_get_address(hconn->raw_conn);
+    qdr_connection_info_t *info = qdr_connection_info(false, //bool             is_encrypted,
+                                                      false, //bool             is_authenticated,
+                                                      true,  //bool             opened,
+                                                      "",   //char            *sasl_mechanisms,
+                                                      QD_INCOMING, //qd_direction_t   dir,
+                                                      hconn->client.client_ip_addr,    //const char      *host,
+                                                      "",    //const char      *ssl_proto,
+                                                      "",    //const char      *ssl_cipher,
+                                                      "",    //const char      *user,
+                                                      "HTTP/1.x Adaptor",    //const char      *container,
+                                                      pn_data(0),     //pn_data_t       *connection_properties,
+                                                      0,     //int              ssl_ssf,
+                                                      false, //bool             ssl,
+                                                      // set if remote is a qdrouter
+                                                      0);    //const qdr_router_version_t *version)
+
+    hconn->conn_id = qd_server_allocate_connection_id(hconn->qd_server);
+    hconn->qdr_conn = qdr_connection_opened(qdr_http1_adaptor->core,
+                                            qdr_http1_adaptor->adaptor,
+                                            true,  // incoming
+                                            QDR_ROLE_NORMAL,
+                                            1,     //cost
+                                            hconn->conn_id,
+                                            0,  // label
+                                            0,  // remote container id
+                                            false,  // strip annotations in
+                                            false,  // strip annotations out
+                                            false,  // allow dynamic link routes
+                                            false,  // allow admin status update
+                                            DEFAULT_CAPACITY,
+                                            0,      // vhost
+                                            info,
+                                            0,      // bind context
+                                            0);     // bind token
+    qdr_connection_set_context(hconn->qdr_conn, hconn);
+
+    qd_log(hconn->adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP connection to client created", hconn->conn_id);
+
+    // simulate a client subscription for reply-to
+    qdr_terminus_t *dynamic_source = qdr_terminus(0);
+    qdr_terminus_set_dynamic(dynamic_source);
+    hconn->out_link = qdr_link_first_attach(hconn->qdr_conn,
+                                            QD_OUTGOING,
+                                            dynamic_source,   //qdr_terminus_t   *source,
+                                            qdr_terminus(0),  //qdr_terminus_t   *target,
+                                            "http1.client.reply-to", //const char       *name,
+                                            0,                  //const char       *terminus_addr,
+                                            false,              // no-route
+                                            NULL,               // initial delivery
+                                            &(hconn->out_link_id));
+    qdr_link_set_context(hconn->out_link, hconn);
+
+    qd_log(hconn->adaptor->log, QD_LOG_DEBUG,
+           "[C%"PRIu64"][L%"PRIu64"] HTTP client response link created",
+           hconn->conn_id, hconn->out_link_id);
+
+    // simulate a client publisher link to the HTTP server:
+    qdr_terminus_t *target = qdr_terminus(0);
+    qdr_terminus_set_address(target, hconn->cfg.address);
+    hconn->in_link = qdr_link_first_attach(hconn->qdr_conn,
+                                           QD_INCOMING,
+                                           qdr_terminus(0),  //qdr_terminus_t   *source,
+                                           target,           //qdr_terminus_t   *target,
+                                           "http1.client.in", //const char       *name,
+                                           0,                //const char       *terminus_addr,
+                                           false,
+                                           NULL,
+                                           &(hconn->in_link_id));
+    qdr_link_set_context(hconn->in_link, hconn);
+
+    qd_log(hconn->adaptor->log, QD_LOG_DEBUG,
+           "[C%"PRIu64"][L%"PRIu64"] HTTP client request link created",
+           hconn->conn_id, hconn->in_link_id);
+
+    // wait until the dynamic reply-to address is returned in the second attach
+    // to grant buffers to the raw connection
+}
+
+
+// Proton Connection Event Handler
+//
+static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
+    qd_log_source_t *log = qdr_http1_adaptor->log;
+
+    qd_log(log, QD_LOG_DEBUG, "RAW CONNECTION EVENT %s\n", pn_event_type_name(pn_event_type(e)));
+
+    if (!hconn) return;
+
+    switch (pn_event_type(e)) {
+
+    case PN_RAW_CONNECTION_CONNECTED: {
+        _setup_client_connection(hconn);
+        break;
+    }
+    case PN_RAW_CONNECTION_CLOSED_READ:
+    case PN_RAW_CONNECTION_CLOSED_WRITE: {
+        qd_log(log, QD_LOG_DEBUG, "[C%i] Closed for %s", hconn->conn_id,
+               pn_event_type(e) == PN_RAW_CONNECTION_CLOSED_READ
+               ? "reading" : "writing");
+        pn_raw_connection_close(hconn->raw_conn);
+        break;
+    }
+    case PN_RAW_CONNECTION_DISCONNECTED: {
+        qd_log(log, QD_LOG_INFO, "[C%i] Disconnected", hconn->conn_id);
+        pn_raw_connection_set_context(hconn->raw_conn, 0);
+        hconn->raw_conn = 0;
+
+        if (hconn->out_link) {
+            qdr_link_set_context(hconn->out_link, 0);
+            qdr_link_detach(hconn->out_link, QD_CLOSED, 0);
+            hconn->out_link = 0;
+        }
+        if (hconn->in_link) {
+            qdr_link_set_context(hconn->in_link, 0);
+            qdr_link_detach(hconn->in_link, QD_CLOSED, 0);
+            hconn->in_link = 0;
+        }
+        if (hconn->qdr_conn) {
+            qdr_connection_set_context(hconn->qdr_conn, 0);
+            qdr_connection_closed(hconn->qdr_conn);
+            hconn->qdr_conn = 0;
+        }
+
+        _client_connection_free(hconn);
+        return;  // hconn no longer valid
+    }
+    case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
+        qd_log(log, QD_LOG_DEBUG, "[C%i] Need write buffers", hconn->conn_id);
+        _write_pending_response((_client_request_t*) DEQ_HEAD(hconn->requests));
+        break;
+    }
+    case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
+        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", hconn->conn_id);
+        // @TODO(kgiusti): backpressure if no credit
+        if (hconn->client.reply_to_addr && !hconn->close_connection /* && hconn->in_link_credit > 0 */) {
+            int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
+                   hconn->conn_id, granted);
+        }
+        break;
+    }
+    case PN_RAW_CONNECTION_WAKE: {
+        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Wake-up", hconn->conn_id);
+        while (qdr_connection_process(hconn->qdr_conn)) {}
+        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Processing done", hconn->conn_id);
+        break;
+    }
+    case PN_RAW_CONNECTION_READ: {
+        qd_buffer_list_t blist;
+        uintmax_t length;
+        qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
+        if (length) {
+            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from client",
+                   hconn->conn_id, hconn->in_link_id, length);
+            hconn->in_http1_octets += length;
+            int error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
+            if (error)
+                qdr_http1_close_connection(hconn, "Incoming request message failed to parse");
+        }
+        break;
+    }
+    case PN_RAW_CONNECTION_WRITTEN: {
+        qdr_http1_free_written_buffers(hconn);
+        break;
+    }
+    default:
+        break;
+    }
+
+    // Check the head request for completion and advance to next request if
+    // done.
+
+    // remove me:
+    if (hconn) {
+        _client_request_t *hreq = (_client_request_t*) DEQ_HEAD(hconn->requests);
+        if (hreq) {
+            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP is client request complete????", hconn->conn_id);
+            qd_log(log, QD_LOG_DEBUG, "   codec=%s req-dlv=%p resp-dlv=%d req_msg=%p %s",
+                   hreq->codec_completed ? "Done" : "Not Done",
+                   (void*)hreq->request_dlv,
+                   (int)DEQ_SIZE(hreq->responses),
+                   (void*)hreq->request_msg,
+                   hreq->cancelled ? "Cancelled" : "Not Cancelled");
+        }
+    }
+
+    // check if the head request is done
+
+    bool need_close = false;
+    _client_request_t *hreq = (_client_request_t *)(hconn ? DEQ_HEAD(hconn->requests) : 0);
+    if (hreq) {
+        // Can we retire the current outgoing response messages?
+        _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+        while (rmsg &&
+               rmsg->encoded &&
+               DEQ_IS_EMPTY(rmsg->out_data.fifo)) {
+            // encoded ==> entire message received from core and
+            // dispo and settlement updated to core.
+            // empty out_data ==> all outstanding raw buffers sent
+            _client_response_msg_free(hreq, rmsg);
+            rmsg = DEQ_HEAD(hreq->responses);
+        }
+
+        if (hreq->codec_completed &&
+            DEQ_IS_EMPTY(hreq->responses) &&
+            hreq->request_settled) {
+
+            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request completed!", hconn->conn_id);
+
+            need_close = hreq->close_on_complete;
+            _client_request_free(hreq);
+        }
+    }
+
+    if (need_close)
+        qdr_http1_close_connection(hconn, "Connection: close");
+    else {
+        hreq = (_client_request_t*) DEQ_HEAD(hconn->requests);
+        if (hreq) {
+
+            if (hreq->request_msg && hconn->in_link_credit > 0) {
+
+                qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+                       "[C%"PRIu64"][L%"PRIu64"] Delivering request to router",
+                       hconn->conn_id, hconn->in_link_id);
+
+                hconn->in_link_credit -= 1;
+                hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0);
+                qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
+                qdr_delivery_incref(hreq->request_dlv, "referenced by HTTP1 adaptor");
+                hreq->request_msg = 0;
+            }
+
+            _write_pending_response(hreq);
+        }
+    }
+}
+
+
+
+
+////////////////////////////////////////////////////////
+// HTTP/1.x Encoder/Decoder Callbacks
+////////////////////////////////////////////////////////
+
+
+// Encoder callback: send blist buffers (response msg) to client endpoint
+//
+static void _client_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *blist, unsigned int len)
+{
+    _client_request_t       *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+    if (!hconn->raw_conn) {
+        // client connection has been lost
+        qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+               "[C%i] Discarding outgoing data - client connection closed", hconn->conn_id);
+        qd_buffer_list_free_buffers(blist);
+        return;
+    }
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] %u request octets encoded",
+           hconn->conn_id, hconn->out_link_id, len);
+
+    // responses are decoded one at a time - the current response it at the
+    // tail of the response list
+
+    _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
+    assert(rmsg);
+    qdr_http1_enqueue_buffer_list(&rmsg->out_data, blist);
+
+    // if this happens to be the current outgoing response try writing to the
+    // raw connection
+
+    if (rmsg == DEQ_HEAD(hreq->responses))
+        _write_pending_response(hreq);
+}
+
+
+// Encoder callback: send body_data buffers (response msg) to client endpoint
+//
+static void _client_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data)
+{
+    _client_request_t       *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+    if (!hconn->raw_conn) {
+        // client connection has been lost
+        qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+               "[C%i] Discarding outgoing data - client connection closed", hconn->conn_id);
+        qd_message_body_data_release(body_data);
+        return;
+    }
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] Sending body data to client",
+           hconn->conn_id, hconn->out_link_id);
+
+    // responses are decoded one at a time - the current response it at the
+    // tail of the response list
+
+    _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
+    assert(rmsg);
+    qdr_http1_enqueue_body_data(&rmsg->out_data, body_data);
+
+    // if this happens to be the current outgoing response try writing to the
+    // raw connection
+
+    if (rmsg == DEQ_HEAD(hreq->responses))
+        _write_pending_response(hreq);
+}
+
+
+// Called when decoding an HTTP request from a client.  This indicates the
+// start of a new request message.
+//
+static int _client_rx_request_cb(h1_codec_request_state_t *hrs,
+                                 const char *method,
+                                 const char *target,
+                                 uint32_t version_major,
+                                 uint32_t version_minor)
+{
+    h1_codec_connection_t    *h1c = h1_codec_request_state_get_connection(hrs);
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*)h1_codec_connection_get_context(h1c);
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"] HTTP request received: method=%s target=%s version=%"PRIi32".%"PRIi32,
+           hconn->conn_id, method, target, version_major, version_minor);
+
+    _client_request_t *creq = new__client_request_t();
+    ZERO(creq);
+    creq->base.msg_id = hconn->client.next_msg_id++;
+    creq->base.lib_rs = hrs;
+    creq->base.hconn = hconn;
+    DEQ_INIT(creq->responses);
+
+    creq->request_props = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
+    qd_compose_start_map(creq->request_props);
+    {
+        // OASIS specifies this value as "1.1" by default...
+        char version[64];
+        snprintf(version, 64, "%"PRIi32".%"PRIi32, version_major, version_minor);
+        qd_compose_insert_symbol(creq->request_props, REQUEST_HEADER_KEY);
+        qd_compose_insert_string(creq->request_props, version);
+
+        qd_compose_insert_symbol(creq->request_props, TARGET_HEADER_KEY);
+        qd_compose_insert_string(creq->request_props, target);
+    }
+
+    h1_codec_request_state_set_context(hrs, (void*) creq);
+    DEQ_INSERT_TAIL(hconn->requests, &creq->base);
+    return 0;
+}
+
+
+// Cannot happen for a client connection!
+static int _client_rx_response_cb(h1_codec_request_state_t *hrs,
+                                  int status_code,
+                                  const char *reason_phrase,
+                                  uint32_t version_major,
+                                  uint32_t version_minor)
+{
+    _client_request_t       *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR,
+           "[C%"PRIu64"][L%"PRIu64"] Spurious HTTP response received from client",
+           hconn->conn_id, hconn->in_link_id);
+    return HTTP1_STATUS_BAD_REQ;
+}
+
+
+// called for each decoded HTTP header.
+//
+static int _client_rx_header_cb(h1_codec_request_state_t *hrs, const char *key, const char *value)
+{
+    _client_request_t       *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] HTTP request header received: key='%s' value='%s'",
+           hconn->conn_id, hconn->in_link_id, key, value);
+
+    if (strcasecmp(key, "connection") == 0) {
+        // We need to filter the connection header out.  See if client
+        // requested 'close' - this means it expects us to close the connection
+        // when the response has been sent
+        //
+        // @TODO(kgiusti): also have to remove other headers given in value!
+        //
+        size_t len;
+        const char *token = qdr_http1_token_list_next(value, &len, &value);
+        while (token) {
+            if (len == 5 && strncasecmp(token, "close", 5) == 0) {
+                hreq->close_on_complete = true;
+                break;
+            }
+            token = qdr_http1_token_list_next(value, &len, &value);
+        }
+
+    } else {
+        qd_compose_insert_symbol(hreq->request_props, key);
+        qd_compose_insert_string(hreq->request_props, value);
+    }
+
+    return 0;
+}
+
+
+// Called after the last header is decoded, before decoding any body data.
+// At this point there is enough data to start forwarding the message to
+// the router.
+//
+static int _client_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_body)
+{
+    _client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] HTTP request headers done.",
+           hconn->conn_id, hconn->in_link_id);
+
+    // now that all the headers have been received we can construct
+    // the AMQP message
+
+    hreq->request_msg = qd_message();
+
+    qd_composed_field_t *hdrs = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+    qd_compose_start_list(hdrs);
+    qd_compose_insert_bool(hdrs, 0);     // durable
+    qd_compose_insert_null(hdrs);        // priority
+    //qd_compose_insert_null(hdrs);        // ttl
+    //qd_compose_insert_bool(hdrs, 0);     // first-acquirer
+    //qd_compose_insert_uint(hdrs, 0);     // delivery-count
+    qd_compose_end_list(hdrs);
+
+    qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, hdrs);
+    qd_compose_start_list(props);
+
+    qd_compose_insert_ulong(props, hreq->base.msg_id);    // message-id
+    qd_compose_insert_null(props);                 // user-id
+    // @TODO(kgiusti) set to: to target?
+    qd_compose_insert_string(props, hconn->cfg.address); // to
+    qd_compose_insert_string(props, h1_codec_request_state_method(hrs));  // subject
+    qd_compose_insert_string(props, hconn->client.reply_to_addr);   // reply-to
+    qd_compose_end_list(props);
+
+    qd_compose_end_map(hreq->request_props);
+
+    if (!has_body) {
+        // @TODO(kgiusti): fixme: tack on an empty body data performative.  The
+        // message decoder will barf otherwise
+        qd_buffer_list_t empty = DEQ_EMPTY;
+        hreq->request_props = qd_compose(QD_PERFORMATIVE_BODY_DATA, hreq->request_props);
+        qd_compose_insert_binary_buffers(hreq->request_props, &empty);
+    }
+
+    qd_message_compose_3(hreq->request_msg, props, hreq->request_props, !has_body);
+    qd_compose_free(props);
+    qd_compose_free(hreq->request_props);
+    hreq->request_props = 0;
+
+    // Use up one credit to obtain a delivery and forward to core.  If no
+    // credit is available the request is stalled until the core grants more
+    // flow.
+    if (hreq == (_client_request_t*) DEQ_HEAD(hconn->requests) && hconn->in_link_credit > 0) {
+        hconn->in_link_credit -= 1;
+
+        qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+               "[C%"PRIu64"][L%"PRIu64"] Delivering request to router",
+               hconn->conn_id, hconn->in_link_id);
+
+        hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0);
+        qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
+        qdr_delivery_incref(hreq->request_dlv, "referenced by HTTP1 adaptor");
+        hreq->request_msg = 0;
+    }
+
+    return 0;
+}
+
+
+// Called with decoded body data.  This may be called multiple times as body
+// data becomes available.
+//
+static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t offset, size_t len,
+                              bool more)
+{
+    _client_request_t       *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+    qd_message_t             *msg = hreq->request_msg ? hreq->request_msg : qdr_delivery_message(hreq->request_dlv);
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.",
+           hconn->conn_id, hconn->in_link_id, len);
+
+    if (offset) {
+        // dispatch assumes all body data starts at the buffer base so it cannot deal with offsets.
+        // Remove the offset by shifting the content of the head buffer forward
+        //
+        qd_buffer_t *head = DEQ_HEAD(*body);
+        memmove(qd_buffer_base(head), qd_buffer_base(head) + offset, qd_buffer_size(head) - offset);
+        head->size -= offset;
+    }
+
+    //
+    // Compose a DATA performative for this section of the stream
+    //
+    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+    qd_compose_insert_binary_buffers(field, body);
+
+    //
+    // Extend the streaming message and free the composed field
+    //
+    qd_message_extend(msg, field);
+    qd_compose_free(field);
+
+    //
+    // Notify the router that more data is ready to be pushed out on the delivery
+    //
+    if (!more)
+        qd_message_set_receive_complete(msg);
+
+    if (hreq->request_dlv)
+        qdr_delivery_continue(qdr_http1_adaptor->core, hreq->request_dlv, false);
+
+    return 0;
+}
+
+
+// Called at the completion of request message decoding.
+//
+static void _client_rx_done_cb(h1_codec_request_state_t *hrs)
+{
+    _client_request_t       *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+    qd_message_t             *msg = hreq->request_msg ? hreq->request_msg : qdr_delivery_message(hreq->request_dlv);
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] HTTP request receive complete.",
+           hconn->conn_id, hconn->in_link_id);
+
+    if (!qd_message_receive_complete(msg)) {
+        qd_message_set_receive_complete(msg);
+        if (hreq->request_dlv) {
+            qdr_delivery_continue(qdr_http1_adaptor->core, hreq->request_dlv, false);
+        }
+    }
+}
+
+
+// The coded has completed processing the request and response messages.
+//
+static void _client_request_complete_cb(h1_codec_request_state_t *lib_rs, bool cancelled)
+{
+    _client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(lib_rs);
+    if (hreq) {
+        hreq->base.lib_rs = 0;  // freed on return from this call
+        hreq->cancelled = hreq->cancelled || cancelled;
+        hreq->codec_completed = !hreq->cancelled;
+
+        qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+               "[C%"PRIu64"] HTTP request/response %s.", hreq->base.hconn->conn_id,
+               cancelled ? "cancelled!" : "codec done");
+    }
+}
+
+
+//////////////////////////////////////////////////////////////////////
+// Router Protocol Adapter Callbacks
+//////////////////////////////////////////////////////////////////////
+
+
+void qdr_http1_client_core_second_attach(qdr_http1_adaptor_t    *adaptor,
+                                         qdr_http1_connection_t *hconn,
+                                         qdr_link_t             *link,
+                                         qdr_terminus_t         *source,
+                                         qdr_terminus_t         *target)
+{
+    if (link == hconn->out_link) {
+        // this is the reply-to link for the client
+        qd_iterator_t *reply_iter = qdr_terminus_get_address(source);
+        hconn->client.reply_to_addr = (char*) qd_iterator_copy(reply_iter);
+
+        assert(hconn->client.reply_to_addr);
+
+        hconn->out_link_credit += DEFAULT_CAPACITY;
+        qdr_link_flow(adaptor->core, link, DEFAULT_CAPACITY, false);
+    }
+}
+
+
+void qdr_http1_client_core_link_flow(qdr_http1_adaptor_t    *adaptor,
+                                     qdr_http1_connection_t *hconn,
+                                     qdr_link_t             *link,
+                                     int                     credit)
+{
+    qd_log(adaptor->log, QD_LOG_DEBUG,
+           "[C%"PRIu64"][L%"PRIu64"] Credit granted on request link %d",
+           hconn->conn_id, hconn->in_link_id, credit);
+
+    assert(link == hconn->in_link);   // router only grants flow on incoming link
+
+    hconn->in_link_credit += credit;
+    if (hconn->in_link_credit > 0) {
+
+        if (hconn->raw_conn) {
+            int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+            qd_log(adaptor->log, QD_LOG_DEBUG,
+                   "[C%"PRIu64"] %d read buffers granted",
+                   hconn->conn_id, granted);
+        }
+
+        // is the current request message blocked by lack of credit?
+
+        _client_request_t *hreq = (_client_request_t *)DEQ_HEAD(hconn->requests);
+        if (hreq && hreq->request_msg) {
+            assert(!hreq->request_dlv);
+            hconn->in_link_credit -= 1;
+
+            qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+                   "[C%"PRIu64"][L%"PRIu64"] Delivering request to router",
+                   hconn->conn_id, hconn->in_link_id);
+
+            hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0);
+            qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
+            qdr_delivery_incref(hreq->request_dlv, "referenced by HTTP1 adaptor");
+            hreq->request_msg = 0;
+        }
+    }
+}
+
+
+// Handle disposition/settlement update for the outstanding request msg
+//
+void qdr_http1_client_core_delivery_update(qdr_http1_adaptor_t      *adaptor,
+                                           qdr_http1_connection_t   *hconn,
+                                           qdr_http1_request_base_t *req,
+                                           qdr_delivery_t           *dlv,
+                                           uint64_t                  disp,
+                                           bool                      settled)
+{
+    _client_request_t *hreq = (_client_request_t *)req;
+    assert(dlv == hreq->request_dlv);
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] HTTP request delivery update, outcome=0x%"PRIx64"%s",
+           hconn->conn_id, hconn->in_link_id, disp, settled ? " settled" : "");
+
+    if (disp && disp != PN_RECEIVED && hreq->request_dispo == 0) {
+        // terminal disposition
+        hreq->request_dispo = disp;
+        if (disp != PN_ACCEPTED) {
+            // no response message is going to arrive.  Now what?  For now fake
+            // a response from the server by using the codec to write an error
+            // response on the behalf of the server.
+            qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                   "[C%"PRIu64"][L%"PRIu64"] HTTP request failure, outcome=0x%"PRIx64,
+                   hconn->conn_id, hconn->in_link_id, disp);
+
+            _client_response_msg_t *rmsg = new__client_response_msg_t();
+            ZERO(rmsg);
+            DEQ_INIT(rmsg->out_data.fifo);
+            DEQ_INSERT_TAIL(hreq->responses, rmsg);
+
+            if (disp == PN_REJECTED) {
+                qdr_http1_error_response(&hreq->base, 400, "Bad Request");
+            } else {
+                // total guess as to what the proper error code should be
+                qdr_http1_error_response(&hreq->base, 503, "Service Unavailable");
+            }
+        }
+    }
+
+    hreq->request_settled = settled || hreq->request_settled;
+}
+
+
+//
+// Response message forwarding
+//
+
+
+// use the correlation ID from the AMQP message containing the response to look
+// up the original request context
+//
+static _client_request_t *_lookup_request_context(qdr_http1_connection_t *hconn,
+                                                  qd_message_t *msg)
+{
+    qdr_http1_request_base_t *req = 0;
+
+    qd_parsed_field_t *cid_pf = 0;
+    qd_iterator_t *cid_iter = qd_message_field_iterator_typed(msg, QD_FIELD_CORRELATION_ID);
+    if (cid_iter) {
+        cid_pf = qd_parse(cid_iter);
+        if (cid_pf && qd_parse_ok(cid_pf)) {
+            uint64_t cid = qd_parse_as_ulong(cid_pf);
+            if (qd_parse_ok(cid_pf)) {
+                req = DEQ_HEAD(hconn->requests);
+                while (req) {
+                    if (req->msg_id == cid)
+                        break;
+                    req = DEQ_NEXT(req);
+                }
+            }
+        }
+    }
+
+    qd_parse_free(cid_pf);
+    qd_iterator_free(cid_iter);
+
+    return (_client_request_t*) req;
+}
+
+
+// Encode the response status and all HTTP headers.
+// The message has been validated to app properties depth
+//
+static bool _encode_response_headers(_client_request_t *hreq,
+                                     _client_response_msg_t *rmsg)
+{
+    bool ok = false;
+    qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
+    qd_iterator_t *app_props_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
+    if (app_props_iter) {
+        qd_parsed_field_t *app_props = qd_parse(app_props_iter);
+        if (app_props && qd_parse_is_map(app_props)) {
+            qd_parsed_field_t *tmp = qd_parse_value_by_key(app_props, STATUS_HEADER_KEY);
+            if (tmp) {
+                int32_t status_code = qd_parse_as_int(tmp);
+                if (qd_parse_ok(tmp)) {
+
+                    // the value for RESPONSE_HEADER_KEY is optional and is set
+                    // to a string representation of the version of the server
+                    // (e.g. "1.1"
+                    uint32_t major = 1;
+                    uint32_t minor = 1;
+                    tmp = qd_parse_value_by_key(app_props, RESPONSE_HEADER_KEY);
+                    if (tmp) {
+                        char *version_str = (char*) qd_iterator_copy(qd_parse_raw(tmp));
+                        if (version_str) {
+                            sscanf(version_str, "%"SCNu32".%"SCNu32, &major, &minor);
+                            free(version_str);
+                        }
+                    }
+                    char *reason_str = 0;
+                    tmp = qd_parse_value_by_key(app_props, REASON_HEADER_KEY);
+                    if (tmp) {
+                        reason_str = (char*) qd_iterator_copy(qd_parse_raw(tmp));
+                    }
+
+                    qd_log(hreq->base.hconn->adaptor->log, QD_LOG_TRACE,
+                           "[C%"PRIu64"][L%"PRIu64"] Encoding response %d %s",
+                           hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id, (int)status_code,
+                           reason_str ? reason_str : "");
+
+                    ok = !h1_codec_tx_response(hreq->base.lib_rs, (int)status_code, reason_str, major, minor);
+                    free(reason_str);
+
+                    // now send all headers in app properties
+                    qd_parsed_field_t *key = qd_field_first_child(app_props);
+                    while (ok && key) {
+                        qd_parsed_field_t *value = qd_field_next_child(key);
+                        if (!value)
+                            break;
+
+                        qd_iterator_t *i_key = qd_parse_raw(key);
+                        if (!i_key)
+                            break;
+
+                        // ignore the special headers added by the mapping
+                        if (!qd_iterator_prefix(i_key, HTTP1_HEADER_PREFIX)) {
+                            qd_iterator_t *i_value = qd_parse_raw(value);
+                            if (!i_value)
+                                break;
+
+                            char *header_key = (char*) qd_iterator_copy(i_key);
+                            char *header_value = (char*) qd_iterator_copy(i_value);
+
+
+                            qd_log(hreq->base.hconn->adaptor->log, QD_LOG_TRACE,
+                                   "[C%"PRIu64"][L%"PRIu64"] Encoding response header %s:%s",
+                                   hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id,
+                                   header_key, header_value);
+
+                            ok = !h1_codec_tx_add_header(hreq->base.lib_rs, header_key, header_value);
+
+                            free(header_key);
+                            free(header_value);
+                        }
+
+                        key = qd_field_next_child(value);
+                    }
+                }
+            }
+        }
+        qd_parse_free(app_props);
+        qd_iterator_free(app_props_iter);
+    }
+
+    return ok;
+}
+
+
+static uint64_t _encode_response_message(_client_request_t *hreq,
+                                         _client_response_msg_t *rmsg)
+{
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+    qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
+    qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_BODY);
+
+    if (status == QD_MESSAGE_DEPTH_INCOMPLETE)
+        return 0;
+
+    if (status == QD_MESSAGE_DEPTH_INVALID) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+               "[C%"PRIu64"][L%"PRIu64"] body data depth check failed",
+               hconn->conn_id, hconn->out_link_id);
+        return PN_REJECTED;
+    }
+
+    assert(status == QD_MESSAGE_DEPTH_OK);
+
+    if (!rmsg->headers_encoded) {
+        rmsg->headers_encoded = true;
+        if (!_encode_response_headers(hreq, rmsg)) {
+            qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                   "[C%"PRIu64"][L%"PRIu64"] message headers malformed - discarding.",
+                   hconn->conn_id, hconn->out_link_id);
+            return PN_REJECTED;
+        }
+    }
+
+    qd_message_body_data_t *body_data = 0;
+
+    while (true) {
+        switch (qd_message_next_body_data(msg, &body_data)) {
+
+        case QD_MESSAGE_BODY_DATA_OK:
+
+            qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+                   "[C%"PRIu64"][L%"PRIu64"] Encoding response body data",
+                   hconn->conn_id, hconn->out_link_id);
+
+            if (h1_codec_tx_body(hreq->base.lib_rs, body_data)) {
+                qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                       "[C%"PRIu64"][L%"PRIu64"] body data encode failed",
+                       hconn->conn_id, hconn->out_link_id);
+                return PN_REJECTED;
+            }
+            break;
+
+        case QD_MESSAGE_BODY_DATA_NO_MORE:
+            // indicate this message is complete
+            qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+                   "[C%"PRIu64"][L%"PRIu64"] response message encoding completed",
+                   hconn->conn_id, hconn->out_link_id);
+            return PN_ACCEPTED;
+
+        case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+            return 0;  // wait for more
+
+        case QD_MESSAGE_BODY_DATA_INVALID:
+        case QD_MESSAGE_BODY_DATA_NOT_DATA:
+            qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                   "[C%"PRIu64"][L%"PRIu64"] Rejecting corrupted body data.",
+                   hconn->conn_id, hconn->out_link_id);
+            return PN_REJECTED;
+        }
+    }
+}
+
+
+// The I/O thread wants to send this delivery containing the response out the
+// link.  It is unlikely that the parsing of this message will fail since the
+// message was constructed by the ingress router.  However if the message fails
+// to parse then there is probably no recovering as the client will now be out
+// of sync.  For now close the connection if an error occurs.
+//
+uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
+                                            qdr_http1_connection_t *hconn,
+                                            qdr_link_t             *link,
+                                            qdr_delivery_t         *delivery,
+                                            bool                    settled)
+{
+    qd_message_t        *msg = qdr_delivery_message(delivery);
+    _client_request_t  *hreq = (_client_request_t*) qdr_delivery_get_context(delivery);
+    if (!hreq) {
+        // new delivery - look for corresponding request via correlation_id
+        switch (qd_message_check_depth(msg, QD_DEPTH_PROPERTIES)) {
+        case QD_MESSAGE_DEPTH_INCOMPLETE:
+            return 0;
+
+        case QD_MESSAGE_DEPTH_INVALID:
+            qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                   "[C%"PRIu64"][L%"PRIu64"] Malformed HTTP/1.x message",
+                   hconn->conn_id, link->identity);
+            qd_message_set_send_complete(msg);
+            qdr_http1_close_connection(hconn, "Malformed response message");
+            return PN_REJECTED;
+
+        case QD_MESSAGE_DEPTH_OK:
+            hreq = _lookup_request_context(hconn, msg);
+            if (!hreq) {
+                // No corresponding request found
+                // @TODO(kgiusti) how to handle this?  - simply discard?
+                qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                       "[C%"PRIu64"][L%"PRIu64"] Discarding malformed message.", hconn->conn_id, link->identity);
+                qd_message_set_send_complete(msg);
+                qdr_http1_close_connection(hconn, "Cannot correlate response message");
+                return PN_REJECTED;
+            }
+
+            // link request state and delivery
+            _client_response_msg_t *rmsg = new__client_response_msg_t();
+            ZERO(rmsg);
+            rmsg->dlv = delivery;
+            DEQ_INIT(rmsg->out_data.fifo);
+            qdr_delivery_set_context(delivery, hreq);
+            qdr_delivery_incref(delivery, "referenced by HTTP1 adaptor");
+            DEQ_INSERT_TAIL(hreq->responses, rmsg);
+            break;
+        }
+    }
+
+    // deliveries arrive one at a time and are added to the tail
+    _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
+    assert(rmsg && rmsg->dlv == delivery);
+
+    if (!rmsg->dispo)
+        rmsg->dispo = _encode_response_message(hreq, rmsg);
+
+    if (rmsg->dispo && qd_message_receive_complete(msg)) {   // encode of message complete
+        rmsg->encoded = true;
+        qd_message_set_send_complete(msg);
+        qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
+
+        qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+               "[C%"PRIu64"][L%"PRIu64"] HTTP client settling response, dispo=0x%"PRIx64,
+               hconn->conn_id, hconn->out_link_id, rmsg->dispo);
+
+        qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+                                          rmsg->dlv,
+                                          rmsg->dispo,
+                                          true,   // settled,
+                                          0,      // error
+                                          0,      // dispo data
+                                          false);
+        if (rmsg->dispo == PN_ACCEPTED) {
+            bool need_close = false;
+            h1_codec_tx_done(hreq->base.lib_rs, &need_close);
+            hreq->close_on_complete = need_close || hreq->close_on_complete;
+        } else {
+            // The response was bad.  There's not much that can be done to
+            // recover, so for now I punt...
+            qdr_http1_close_connection(hconn, "Cannot parse response message");
+        }
+    }
+
+    return 0;
+}
+
+
+//
+// Misc
+//
+
+
+// free the response message
+//
+static void _client_response_msg_free(_client_request_t *req, _client_response_msg_t *rmsg)
+{
+    DEQ_REMOVE(req->responses, rmsg);
+    if (rmsg->dlv) {
+        qdr_delivery_set_context(rmsg->dlv, 0);
+        qdr_delivery_decref(qdr_http1_adaptor->core, rmsg->dlv, "HTTP1 adaptor response settled");
+    }
+
+    qdr_http1_out_data_fifo_cleanup(&rmsg->out_data);
+
+    free__client_response_msg_t(rmsg);
+}
+
+
+// Check the head response message for buffers that need to be sent
+//
+static void _write_pending_response(_client_request_t *hreq)
+{
+    if (hreq && !hreq->cancelled && !hreq->base.hconn->close_connection) {
+        assert(DEQ_PREV(&hreq->base) == 0);  // must preserve order
+        _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+        if (rmsg && rmsg->out_data.write_ptr) {
+            uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &rmsg->out_data);
+            qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%i] %"PRIu64" octets written",
+                   hreq->base.hconn->conn_id, written);
+        }
+    }
+}
+
+
+static void _client_request_free(_client_request_t *hreq)
+{
+    if (hreq) {
+        qdr_http1_request_base_cleanup(&hreq->base);
+        qd_message_free(hreq->request_msg);
+        if (hreq->request_dlv) {
+            qdr_delivery_set_context(hreq->request_dlv, 0);
+            qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request settled");
+        }
+        qd_compose_free(hreq->request_props);
+
+        _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+        while (rmsg) {
+            _client_response_msg_free(hreq, rmsg);
+            rmsg = DEQ_HEAD(hreq->responses);
+        }
+
+        free__client_request_t(hreq);
+    }
+}
+
+
+static void _client_connection_free(qdr_http1_connection_t *hconn)
+{
+    for (_client_request_t *hreq = (_client_request_t*) DEQ_HEAD(hconn->requests);
+         hreq;
+         hreq = (_client_request_t*) DEQ_HEAD(hconn->requests)) {
+        _client_request_free(hreq);
+    }
+    qdr_http1_connection_free(hconn);
+}
diff --git a/src/adaptors/http1/http1_lib.c b/src/adaptors/http1/http1_codec.c
similarity index 66%
rename from src/adaptors/http1/http1_lib.c
rename to src/adaptors/http1/http1_codec.c
index 102a1aa..bec96fa 100644
--- a/src/adaptors/http1/http1_lib.c
+++ b/src/adaptors/http1/http1_codec.c
@@ -18,7 +18,7 @@
  *
  */
 
-#include <qpid/dispatch/http1_lib.h>
+#include <qpid/dispatch/http1_codec.h>
 
 #include <qpid/dispatch/iterator.h>
 #include <qpid/dispatch/buffer.h>
@@ -27,6 +27,17 @@
 #include <ctype.h>
 #include <stdio.h>
 #include <string.h>
+#include <assert.h>
+
+
+//
+// This file contains code for encoding/decoding an HTTP/1.x data stream.  See
+// http1_codec.h for details.
+//
+
+
+// @TODO(kgiusti)
+// - properly set 'more' flag on rx_body() callback
 
 
 const uint8_t CR_TOKEN = '\r';
@@ -38,6 +49,17 @@ const qd_iterator_pointer_t NULL_I_PTR = {0};
 // true for informational response codes
 #define IS_INFO_RESPONSE(code) ((code) / 100 == 1)
 
+// true if response code indicates that the response will NOT contain a body
+// 204 = No Content
+// 205 = Reset Content
+// 304 = Not Modified
+#define NO_BODY_RESPONSE(code) \
+    ((code) == 204 ||          \
+     (code) == 205 ||          \
+     (code) == 304 ||          \
+     IS_INFO_RESPONSE(code))
+
+
 typedef enum {
     HTTP1_MSG_STATE_START = 0, // parsing start-line
     HTTP1_MSG_STATE_HEADERS,   // parsing headers
@@ -53,37 +75,49 @@ typedef enum {
 } http1_chunk_state_t;
 
 
-typedef struct scratch_buffer_t {
+typedef struct scratch_memory_t {
     uint8_t *buf;
-    size_t   size;  // of buffer, not contents!
-} scratch_buffer_t;
+    size_t   size;  // of allocated memory, not contents!
+} scratch_memory_t;
 
 
-// state for a single request-response transaction
+// State for a single request-response transaction.
 //
-struct http1_transfer_t {
-    DEQ_LINKS(struct http1_transfer_t);
-    void         *context;
-    http1_conn_t *conn;
-    uint32_t      response_code;
-
-    bool close_on_done;     // true if connection must be closed when transfer completes
-    bool is_head_method;    // true if request method is HEAD
-    bool is_connect_method; // true if request method is CONNECT TODO(kgiusti): supported?
+// A new state is created when a request starts (either via the rx_request
+// callback in the case of client connections or the h1_codec_tx_request() call
+// for server connections).
+//
+// For a connection to a server the rx_response callbacks will occur in the same
+// order as h1_codec_tx_request calls are made.
+//
+// For a connection to a client the caller must ensure that calls to
+// h1_codec_tx_response() must be made in the same order as rx_request callbacks
+// occur.
+//
+struct h1_codec_request_state_t {
+    DEQ_LINKS(struct h1_codec_request_state_t);
+    void                *context;
+    h1_codec_connection_t *conn;
+    char                *method;
+    uint32_t             response_code;
+
+    bool no_body_method;    // true if request method is either HEAD or CONNECT
+    bool request_complete;  // true when request message done encoding/decoding
+    bool response_complete; // true when response message done encoding/decoding
 };
-DEQ_DECLARE(http1_transfer_t, http1_transfer_list_t);
-ALLOC_DECLARE(http1_transfer_t);
-ALLOC_DEFINE(http1_transfer_t);
+DEQ_DECLARE(h1_codec_request_state_t, h1_codec_request_state_list_t);
+ALLOC_DECLARE(h1_codec_request_state_t);
+ALLOC_DEFINE(h1_codec_request_state_t);
 
 
 // The HTTP/1.1 connection
 //
-struct http1_conn_t {
+struct h1_codec_connection_t {
     void *context;
 
     // http requests are added to tail,
     // in-progress response is at head
-    http1_transfer_list_t xfers;
+    h1_codec_request_state_list_t hrs_queue;
 
     // Decoder for current incoming msg.
     //
@@ -103,19 +137,18 @@ struct http1_conn_t {
         qd_iterator_pointer_t  read_ptr;
         qd_iterator_pointer_t  body_ptr;
 
-        http1_transfer_t      *xfer;            // current transfer
-        http1_msg_state_t      state;
-        scratch_buffer_t       scratch;
-        int                    error;
-        const char            *error_msg;
+        h1_codec_request_state_t *hrs;            // current request/response
+        http1_msg_state_t       state;
+        scratch_memory_t        scratch;
+        const char             *error_msg;
+        int                     error;
 
-        uint64_t               content_length;
+        intmax_t               content_length;
         http1_chunk_state_t    chunk_state;
         uint64_t               chunk_length;
 
         bool is_request;
         bool is_chunked;
-        bool is_1_0;
 
         // decoded headers
         bool hdr_transfer_encoding;
@@ -135,53 +168,57 @@ struct http1_conn_t {
     struct encoder_t {
         qd_buffer_list_t       outgoing;
         qd_iterator_pointer_t  write_ptr;
-        http1_transfer_t      *xfer;           // current transfer
+        h1_codec_request_state_t *hrs;           // current request/response state
 
+        bool headers_sent;  // true after all headers have been sent
         bool is_request;
-        bool crlf_sent;    // true if the CRLF after headers has been sent
+        bool is_chunked;
+
+        // headers provided
+        bool hdr_content_length;
     } encoder;
 
-    http1_conn_config_t config;
+    h1_codec_config_t config;
 };
-ALLOC_DECLARE(http1_conn_t);
-ALLOC_DEFINE(http1_conn_t);
+ALLOC_DECLARE(h1_codec_connection_t);
+ALLOC_DEFINE(h1_codec_connection_t);
 
 static void decoder_reset(struct decoder_t *d);
 static void encoder_reset(struct encoder_t *e);
 
 
-// Create a new transfer - this is done when a new http request occurs
-// Keep oldest outstanding tranfer at DEQ_HEAD(conn->xfers)
-static http1_transfer_t *http1_transfer(http1_conn_t *conn)
+// Create a new request state - this is done when a new http request occurs
+// Keep oldest outstanding tranfer at DEQ_HEAD(conn->hrs_queue)
+static h1_codec_request_state_t *h1_codec_request_state(h1_codec_connection_t *conn)
 {
-    http1_transfer_t *xfer = new_http1_transfer_t();
-    ZERO(xfer);
-    xfer->conn = conn;
-    DEQ_INSERT_TAIL(conn->xfers, xfer);
-    return xfer;
+    h1_codec_request_state_t *hrs = new_h1_codec_request_state_t();
+    ZERO(hrs);
+    hrs->conn = conn;
+    DEQ_INSERT_TAIL(conn->hrs_queue, hrs);
+    return hrs;
 }
 
 
-static void http1_transfer_free(http1_transfer_t *xfer)
+static void h1_codec_request_state_free(h1_codec_request_state_t *hrs)
 {
-    if (xfer) {
-        http1_conn_t *conn = xfer->conn;
-        assert(conn->decoder.xfer != xfer);
-        assert(conn->encoder.xfer != xfer);
-        DEQ_REMOVE(conn->xfers, xfer);
-        free_http1_transfer_t(xfer);
+    if (hrs) {
+        h1_codec_connection_t *conn = hrs->conn;
+        assert(conn->decoder.hrs != hrs);
+        assert(conn->encoder.hrs != hrs);
+        DEQ_REMOVE(conn->hrs_queue, hrs);
+        free_h1_codec_request_state_t(hrs);
     }
 }
 
 
-http1_conn_t *http1_connection(http1_conn_config_t *config, void *context)
+h1_codec_connection_t *h1_codec_connection(h1_codec_config_t *config, void *context)
 {
-    http1_conn_t *conn = new_http1_conn_t();
+    h1_codec_connection_t *conn = new_h1_codec_connection_t();
     ZERO(conn);
 
     conn->context = context;
     conn->config = *config;
-    DEQ_INIT(conn->xfers);
+    DEQ_INIT(conn->hrs_queue);
 
     encoder_reset(&conn->encoder);
     DEQ_INIT(conn->encoder.outgoing);
@@ -195,46 +232,44 @@ http1_conn_t *http1_connection(http1_conn_config_t *config, void *context)
 }
 
 
-// Close the connection conn.
-//
-// This cancels all outstanding transfers and destroys the connection.  If
-// there is an incoming response message body being parsed when this function
-// is invoked it will signal the end of the message.
+// The connection has closed.  If this is a connection to a server this may
+// simply be the end of the response message. If so mark it complete.
 //
-void http1_connection_close(http1_conn_t *conn)
+void h1_codec_connection_closed(h1_codec_connection_t *conn)
 {
     if (conn) {
-        struct decoder_t *decoder = &conn->decoder;
-        if (!decoder->error) {
-            if (decoder->state == HTTP1_MSG_STATE_BODY
-                && decoder->xfer) {
-
-                // terminate the incoming message as the server has closed the
-                // connection to indicate the end of the message
-                conn->config.xfer_rx_done(decoder->xfer);
-            }
-
-            // notify any outstanding transfers
-            http1_transfer_t *xfer = DEQ_HEAD(conn->xfers);
-            while (xfer) {
-                conn->config.xfer_done(xfer);
-                xfer = DEQ_NEXT(xfer);
+        if (conn->config.type == HTTP1_CONN_SERVER) {
+            struct decoder_t *decoder = &conn->decoder;
+            if (decoder->hrs &&
+                decoder->hrs->request_complete &&
+                decoder->state == HTTP1_MSG_STATE_BODY) {
+
+                h1_codec_request_state_t *hrs = decoder->hrs;
+                decoder_reset(decoder);
+                hrs->response_complete = true;
+                conn->config.rx_done(hrs);
+                conn->config.request_complete(hrs, false);
+                h1_codec_request_state_free(hrs);
             }
         }
+    }
+}
 
+
+// Free the connection
+//
+void h1_codec_connection_free(h1_codec_connection_t *conn)
+{
+    if (conn) {
+        // expect application to cancel all requests!
+        assert(DEQ_IS_EMPTY(conn->hrs_queue));
         decoder_reset(&conn->decoder);
         encoder_reset(&conn->encoder);
         qd_buffer_list_free_buffers(&conn->decoder.incoming);
         qd_buffer_list_free_buffers(&conn->encoder.outgoing);
         free(conn->decoder.scratch.buf);
 
-        http1_transfer_t *xfer = DEQ_HEAD(conn->xfers);
-        while (xfer) {
-            http1_transfer_free(xfer);  // removes from conn->xfers list
-            xfer = DEQ_HEAD(conn->xfers);
-        }
-
-        free_http1_conn_t(conn);
+        free_h1_codec_connection_t(conn);
     }
 }
 
@@ -247,29 +282,57 @@ static void decoder_reset(struct decoder_t *decoder)
     // track the current position in the incoming data stream
 
     decoder->body_ptr = NULL_I_PTR;
-    decoder->xfer = 0;
+    decoder->hrs = 0;
     decoder->state = HTTP1_MSG_STATE_START;
     decoder->content_length = 0;
     decoder->chunk_state = HTTP1_CHUNK_HEADER;
     decoder->chunk_length = 0;
     decoder->error = 0;
     decoder->error_msg = 0;
-
     decoder->is_request = false;
     decoder->is_chunked = false;
-    decoder->is_1_0 = false;
-
     decoder->hdr_transfer_encoding = false;
     decoder->hdr_content_length = false;
 }
 
+
 // reset the tx encoder after message sent
 static void encoder_reset(struct encoder_t *encoder)
 {
     // do not touch the write_ptr or the outgoing queue as there may be more messages to send.
-    encoder->xfer = 0;
+    encoder->hrs = 0;
+    encoder->headers_sent = false;
     encoder->is_request = false;
-    encoder->crlf_sent = false;
+    encoder->is_chunked = false;
+    encoder->hdr_content_length = false;
+}
+
+
+// convert a string representation of a Content-Length value to
+// and integer.  Return true if parse succeeds
+//
+static bool _parse_content_length(const char *clen, intmax_t *value)
+{
+    // a valid value is an integer >= 0
+    *value = 0;
+    return sscanf(clen, "%"PRIdMAX, value) == 1 && *value > -1;
+}
+
+
+// Scan the value of a Transfer-Encoding header to see if the
+// last encoding is chunked
+//
+static bool _is_transfer_chunked(const char *encoding)
+{
+    // "chunked" must be the last item in the value string.  And remember kids:
+    // coding type names are case insensitive!
+    //
+    size_t len = strlen(encoding);
+    if (len >= 7) {  // 7 = strlen("chunked")
+        const char *ptr = encoding + len - 7;
+        return strcasecmp("chunked", ptr) == 0;
+    }
+    return false;
 }
 
 
@@ -364,8 +427,11 @@ static bool is_empty_line(const qd_iterator_pointer_t *line)
     return false;
 }
 
+
+// for debug:
 static void debug_print_iterator_pointer(const char *prefix, const qd_iterator_pointer_t *ptr)
 {
+#if 0
     qd_iterator_pointer_t tmp = *ptr;
     fprintf(stdout, "%s '", prefix);
     size_t len = MIN(tmp.remaining, 80);
@@ -375,6 +441,7 @@ static void debug_print_iterator_pointer(const char *prefix, const qd_iterator_p
     }
     fprintf(stdout, "%s'\n", (tmp.remaining) ? " <truncated>" : "");
     fflush(stdout);
+#endif
 }
 
 
@@ -414,7 +481,7 @@ static bool read_line(qd_iterator_pointer_t *data, qd_iterator_pointer_t *line)
 }
 
 
-static bool ensure_scratch_size(scratch_buffer_t *b, size_t required)
+static bool ensure_scratch_size(scratch_memory_t *b, size_t required)
 {
     if (b->size < required) {
         if (b->buf)
@@ -514,7 +581,7 @@ static bool parse_field(qd_iterator_pointer_t *line, qd_iterator_pointer_t *fiel
 // parse the HTTP/1.1 request line:
 // "method SP request-target SP HTTP-version CRLF"
 //
-static bool parse_request_line(http1_conn_t *conn, struct decoder_t *decoder, qd_iterator_pointer_t *line)
+static bool parse_request_line(h1_codec_connection_t *conn, struct decoder_t *decoder, qd_iterator_pointer_t *line)
 {
     qd_iterator_pointer_t method = {0};
     qd_iterator_pointer_t target = {0};
@@ -561,21 +628,20 @@ static bool parse_request_line(http1_conn_t *conn, struct decoder_t *decoder, qd
         return decoder->error;
     }
 
-    http1_transfer_t *xfer = http1_transfer(conn);
+    h1_codec_request_state_t *hrs = h1_codec_request_state(conn);
 
     // check for methods that do not support body content in the response:
-    if (strcmp((char*)method_str, "HEAD") == 0)
-        xfer->is_head_method = true;
-    else if (strcmp((char*)method_str, "CONNECT") == 0)
-        xfer->is_connect_method = true;
+    hrs->no_body_method = (strcmp((char*)method_str, "HEAD") == 0 ||
+                           strcmp((char*)method_str, "CONNECT") == 0);
+
+    hrs->method = qd_strdup((char*) method_str);
 
-    decoder->xfer = xfer;
+    decoder->hrs = hrs;
     decoder->is_request = true;
-    decoder->is_1_0 = (minor == 0);
 
-    decoder->error = conn->config.xfer_rx_request(xfer, (char*)method_str, (char*)target_str, (char*)version_str);
+    decoder->error = conn->config.rx_request(hrs, (char*)method_str, (char*)target_str, major, minor);
     if (decoder->error)
-        decoder->error_msg = "xfer_rx_request callback error";
+        decoder->error_msg = "hrs_rx_request callback error";
     return decoder->error;
 }
 
@@ -583,7 +649,7 @@ static bool parse_request_line(http1_conn_t *conn, struct decoder_t *decoder, qd
 // parse the HTTP/1.1 response line
 // "HTTP-version SP status-code [SP reason-phrase] CRLF"
 //
-static int parse_response_line(http1_conn_t *conn, struct decoder_t *decoder, qd_iterator_pointer_t *line)
+static int parse_response_line(h1_codec_connection_t *conn, struct decoder_t *decoder, qd_iterator_pointer_t *line)
 {
     qd_iterator_pointer_t version = {0};
     qd_iterator_pointer_t status_code = {0};
@@ -599,23 +665,23 @@ static int parse_response_line(http1_conn_t *conn, struct decoder_t *decoder, qd
     }
 
     // Responses arrive in the same order as requests are generated so this new
-    // response corresponds to head xfer
-    http1_transfer_t *xfer = DEQ_HEAD(conn->xfers);
-    if (!xfer) {
+    // response corresponds to head hrs
+    h1_codec_request_state_t *hrs = DEQ_HEAD(conn->hrs_queue);
+    if (!hrs) {
         // receiving a response without a corresponding request
         decoder->error_msg = "Spurious HTTP response received";
         decoder->error = HTTP1_STATUS_SERVER_ERR;
         return decoder->error;
     }
 
-    assert(!decoder->xfer);   // state machine violation
-    assert(xfer->response_code == 0);
+    assert(!decoder->hrs);   // state machine violation
+    assert(hrs->response_code == 0);
 
-    decoder->xfer = xfer;
+    decoder->hrs = hrs;
 
     unsigned char code_str[4];
     pointer_2_str(&status_code, code_str, 4);
-    xfer->response_code = atoi((char*) code_str);
+    hrs->response_code = atoi((char*) code_str);
 
     // the reason phrase is optional, and may contain spaces
 
@@ -652,13 +718,13 @@ static int parse_response_line(http1_conn_t *conn, struct decoder_t *decoder, qd
     }
 
     decoder->is_request = false;
-    decoder->is_1_0 = (minor == 0);
 
-    decoder->error = conn->config.xfer_rx_response(decoder->xfer, (char*)version_str,
-                                                   xfer->response_code,
-                                                   (offset) ? (char*)reason_str: 0);
+    decoder->error = conn->config.rx_response(decoder->hrs,
+                                              hrs->response_code,
+                                              (offset) ? (char*)reason_str: 0,
+                                              major, minor);
     if (decoder->error)
-        decoder->error_msg = "xfer_rx_response callback error";
+        decoder->error_msg = "hrs_rx_response callback error";
 
     return decoder->error;
 }
@@ -666,7 +732,7 @@ static int parse_response_line(http1_conn_t *conn, struct decoder_t *decoder, qd
 
 // parse the first line of an incoming http message
 //
-static bool parse_start_line(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_start_line(h1_codec_connection_t *conn, struct decoder_t *decoder)
 {
     qd_iterator_pointer_t *rptr = &decoder->read_ptr;
     qd_iterator_pointer_t line;
@@ -695,7 +761,7 @@ static bool parse_start_line(http1_conn_t *conn, struct decoder_t *decoder)
 // Called after the last incoming header was decoded and passed to the
 // application
 //
-static bool process_headers_done(http1_conn_t *conn, struct decoder_t *decoder)
+static bool process_headers_done(h1_codec_connection_t *conn, struct decoder_t *decoder)
 {
     // Flush all buffers processed so far - no longer needed
 
@@ -721,12 +787,6 @@ static bool process_headers_done(http1_conn_t *conn, struct decoder_t *decoder)
         }
     }
 
-    decoder->error = conn->config.xfer_rx_headers_done(decoder->xfer);
-    if (decoder->error) {
-        decoder->error_msg = "xfer_rx_headers_done callback error";
-        return false;
-    }
-
     // determine if a body is present (ref RFC7230 sec 3.3.3 Message Body Length)
     bool has_body;
     if (decoder->is_request) {
@@ -740,13 +800,8 @@ static bool process_headers_done(http1_conn_t *conn, struct decoder_t *decoder)
         // size via Content-Length or chunked encoder, OR its length is unspecified
         // and the message body is terminated by closing the connection.
         //
-        http1_transfer_t *xfer = decoder->xfer;
-        has_body = !(xfer->is_head_method       ||
-                     xfer->is_connect_method    ||
-                     xfer->response_code == 204 ||     // No Content
-                     xfer->response_code == 205 ||     // Reset Content
-                     xfer->response_code == 304 ||     // Not Modified
-                     IS_INFO_RESPONSE(xfer->response_code));
+        h1_codec_request_state_t *hrs = decoder->hrs;
+        has_body = !(hrs->no_body_method || NO_BODY_RESPONSE(hrs->response_code));
         if (has_body) {
             // no body if explicit Content-Length of zero
             if (decoder->hdr_content_length && decoder->content_length == 0) {
@@ -755,6 +810,12 @@ static bool process_headers_done(http1_conn_t *conn, struct decoder_t *decoder)
         }
     }
 
+    decoder->error = conn->config.rx_headers_done(decoder->hrs, has_body);
+    if (decoder->error) {
+        decoder->error_msg = "hrs_rx_headers_done callback error";
+        return false;
+    }
+
     if (has_body) {
         // start tracking the body buffer chain
         decoder->body_ptr = decoder->read_ptr;
@@ -770,13 +831,13 @@ static bool process_headers_done(http1_conn_t *conn, struct decoder_t *decoder)
 
 // process a received header to determine message body length, etc.
 //
-static int process_header(http1_conn_t *conn, struct decoder_t *decoder, const uint8_t *key, const uint8_t *value)
+static int process_header(h1_codec_connection_t *conn, struct decoder_t *decoder, const uint8_t *key, const uint8_t *value)
 {
     int parse_error = decoder->is_request ? HTTP1_STATUS_BAD_REQ : HTTP1_STATUS_SERVER_ERR;
 
     if (strcasecmp("Content-Length", (char*) key) == 0) {
-        uint64_t old = decoder->content_length;
-        if (sscanf((char*)value, "%"PRIu64, &decoder->content_length) != 1) {
+        intmax_t old = decoder->content_length;
+        if (!_parse_content_length((char*) value, &decoder->content_length)) {
             decoder->error_msg = "Malformed Content-Length header";
             decoder->error = parse_error;
             return decoder->error;
@@ -788,16 +849,8 @@ static int process_header(http1_conn_t *conn, struct decoder_t *decoder, const u
         }
         decoder->hdr_content_length = true;
 
-    } else if (strcasecmp("Transfer-Encoding", (char *)key) == 0) {
-        // check if "chunked" is present and it is the last item in the value
-        // string.  And remember kids: coding type names are case insensitive!
-        // Also note "value" has already been trimmed of whitespace at both
-        // ends.
-        size_t len = strlen((char*)value);
-        if (len >= 7) {  // 7 = strlen("chunked")
-            const char *ptr = ((char*) value) + len - 7;
-            decoder->is_chunked = strcasecmp("chunked", ptr) == 0;
-        }
+    } else if (strcasecmp("Transfer-Encoding", (char*) key) == 0) {
+        decoder->is_chunked = _is_transfer_chunked((char*) value);
         decoder->hdr_transfer_encoding = true;
     }
 
@@ -807,12 +860,12 @@ static int process_header(http1_conn_t *conn, struct decoder_t *decoder, const u
 
 // Parse out the header key and value
 //
-static bool parse_header(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_header(h1_codec_connection_t *conn, struct decoder_t *decoder)
 {
     qd_iterator_pointer_t *rptr = &decoder->read_ptr;
     qd_iterator_pointer_t line;
-    http1_transfer_t *xfer = decoder->xfer;
-    assert(xfer);  // else state machine busted
+    h1_codec_request_state_t *hrs = decoder->hrs;
+    assert(hrs);  // else state machine busted
 
     if (read_line(rptr, &line)) {
         debug_print_iterator_pointer("header:", &line);
@@ -862,9 +915,9 @@ static bool parse_header(http1_conn_t *conn, struct decoder_t *decoder)
         process_header(conn, decoder, key_str, value_str);
 
         if (!decoder->error) {
-            decoder->error = conn->config.xfer_rx_header(xfer, (char *)key_str, (char *)value_str);
+            decoder->error = conn->config.rx_header(hrs, (char *)key_str, (char *)value_str);
             if (decoder->error)
-                decoder->error_msg = "xfer_rx_header callback error";
+                decoder->error_msg = "hrs_rx_header callback error";
         }
 
         return !!rptr->remaining;
@@ -880,7 +933,7 @@ static bool parse_header(http1_conn_t *conn, struct decoder_t *decoder)
 
 // Pass message body data up to the application.
 //
-static inline int consume_body_data(http1_conn_t *conn, bool flush)
+static inline int consume_body_data(h1_codec_connection_t *conn, bool flush)
 {
     struct decoder_t *decoder = &conn->decoder;
     qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
@@ -889,9 +942,10 @@ static inline int consume_body_data(http1_conn_t *conn, bool flush)
     // shortcut: if no more data to parse send the entire incoming chain
     if (rptr->remaining == 0) {
 
-        decoder->error = conn->config.xfer_rx_body(decoder->xfer, &decoder->incoming,
-                                                   body_ptr->cursor - qd_buffer_base(body_ptr->buffer),
-                                                   body_ptr->remaining);
+        decoder->error = conn->config.rx_body(decoder->hrs, &decoder->incoming,
+                                              body_ptr->cursor - qd_buffer_base(body_ptr->buffer),
+                                              body_ptr->remaining,
+                                              true);
         DEQ_INIT(decoder->incoming);
         *body_ptr = NULL_I_PTR;
         *rptr = NULL_I_PTR;
@@ -902,7 +956,7 @@ static inline int consume_body_data(http1_conn_t *conn, bool flush)
     // unparsed data.  Send any buffers preceding the current read pointer.
     qd_buffer_list_t blist = DEQ_EMPTY;
     size_t octets = 0;
-    const size_t body_offset = body_ptr->cursor - qd_buffer_base(body_ptr->buffer);
+    size_t body_offset = body_ptr->cursor - qd_buffer_base(body_ptr->buffer);
 
     // invariant:
     assert(DEQ_HEAD(decoder->incoming) == body_ptr->buffer);
@@ -928,12 +982,15 @@ static inline int consume_body_data(http1_conn_t *conn, bool flush)
         qd_buffer_insert(tail, body_ptr->remaining);
         DEQ_INSERT_TAIL(blist, tail);
         octets += body_ptr->remaining;
+        if (DEQ_SIZE(blist) == 1)
+            body_offset = 0;
 
         *body_ptr = *rptr;
         body_ptr->remaining = 0;
     }
 
-    decoder->error = conn->config.xfer_rx_body(decoder->xfer, &blist, body_offset, octets);
+    if (octets)
+        decoder->error = conn->config.rx_body(decoder->hrs, &blist, body_offset, octets, true);
     return decoder->error;
 }
 
@@ -942,7 +999,7 @@ static inline int consume_body_data(http1_conn_t *conn, bool flush)
 // parsing the start of a chunked header:
 // <chunk size in hex>CRLF
 //
-static bool parse_body_chunked_header(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_body_chunked_header(h1_codec_connection_t *conn, struct decoder_t *decoder)
 {
     qd_iterator_pointer_t *rptr = &decoder->read_ptr;
     qd_iterator_pointer_t line;
@@ -984,7 +1041,7 @@ static bool parse_body_chunked_header(http1_conn_t *conn, struct decoder_t *deco
 
 // Parse the data section of a chunk
 //
-static bool parse_body_chunked_data(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_body_chunked_data(h1_codec_connection_t *conn, struct decoder_t *decoder)
 {
     qd_iterator_pointer_t *rptr = &decoder->read_ptr;
     qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
@@ -1007,7 +1064,7 @@ static bool parse_body_chunked_data(http1_conn_t *conn, struct decoder_t *decode
 
 // Keep reading chunk trailers until the terminating empty line is read
 //
-static bool parse_body_chunked_trailer(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_body_chunked_trailer(h1_codec_connection_t *conn, struct decoder_t *decoder)
 {
     qd_iterator_pointer_t *rptr = &decoder->read_ptr;
     qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
@@ -1032,7 +1089,7 @@ static bool parse_body_chunked_trailer(http1_conn_t *conn, struct decoder_t *dec
 
 // parse an incoming message body which is chunk encoded
 // Return True if there is more data pending to parse
-static bool parse_body_chunked(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_body_chunked(h1_codec_connection_t *conn, struct decoder_t *decoder)
 {
     bool more = true;
     switch (decoder->chunk_state) {
@@ -1056,7 +1113,7 @@ static bool parse_body_chunked(http1_conn_t *conn, struct decoder_t *decoder)
 
 // parse an incoming message body which is Content-Length bytes long
 //
-static bool parse_body_content(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_body_content(h1_codec_connection_t *conn, struct decoder_t *decoder)
 {
     qd_iterator_pointer_t *rptr = &decoder->read_ptr;
     qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
@@ -1074,7 +1131,7 @@ static bool parse_body_content(http1_conn_t *conn, struct decoder_t *decoder)
 }
 
 
-static bool parse_body(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_body(h1_codec_connection_t *conn, struct decoder_t *decoder)
 {
     if (decoder->is_chunked)
         return parse_body_chunked(conn, decoder);
@@ -1084,13 +1141,14 @@ static bool parse_body(http1_conn_t *conn, struct decoder_t *decoder)
 
     // otherwise no explict body size, so just keep passing the entire unparsed
     // incoming chain along until the remote closes the connection
-    decoder->error = conn->config.xfer_rx_body(decoder->xfer,
-                                               &decoder->incoming,
-                                               decoder->read_ptr.cursor
-                                               - qd_buffer_base(decoder->read_ptr.buffer),
-                                               decoder->read_ptr.remaining);
+    decoder->error = conn->config.rx_body(decoder->hrs,
+                                          &decoder->incoming,
+                                          decoder->read_ptr.cursor
+                                          - qd_buffer_base(decoder->read_ptr.buffer),
+                                          decoder->read_ptr.remaining,
+                                          true);
     if (decoder->error) {
-        decoder->error_msg = "xfer_rx_body callback error";
+        decoder->error_msg = "hrs_rx_body callback error";
         return false;
     }
 
@@ -1103,39 +1161,40 @@ static bool parse_body(http1_conn_t *conn, struct decoder_t *decoder)
 
 // Called when incoming message is complete
 //
-static bool parse_done(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_done(h1_codec_connection_t *conn, struct decoder_t *decoder)
 {
-    http1_transfer_t *xfer = decoder->xfer;
+    h1_codec_request_state_t *hrs = decoder->hrs;
     bool is_response = !decoder->is_request;
 
-    if (!decoder->error) {
-        // signal the message receive is complete
-        conn->config.xfer_rx_done(xfer);
+    // signal the message receive is complete
+    conn->config.rx_done(hrs);
 
-        if (is_response) {   // request<->response transfer complete
-
-            // Informational 1xx response codes are NOT teriminal - further responses are allowed!
-            if (IS_INFO_RESPONSE(xfer->response_code)) {
-                xfer->response_code = 0;
-            } else {
-                // The message exchange is complete
-                conn->config.xfer_done(xfer);
-                decoder->xfer = 0;
-                http1_transfer_free(xfer);
-            }
+    if (is_response) {
+        // Informational 1xx response codes are NOT teriminal - further responses are allowed!
+        if (IS_INFO_RESPONSE(hrs->response_code)) {
+            hrs->response_code = 0;
+        } else {
+            hrs->response_complete = true;
         }
+    } else {
+        hrs->request_complete = true;
+    }
 
-        decoder_reset(decoder);
-        return !!decoder->read_ptr.remaining;
+    if (hrs->request_complete && hrs->response_complete) {
+        conn->config.request_complete(hrs, false);
+        decoder->hrs = 0;
+        h1_codec_request_state_free(hrs);
     }
-    return false;
+
+    decoder_reset(decoder);
+    return !!decoder->read_ptr.remaining;
 }
 
 
 // Main decode loop.
 // Process received data until it is exhausted
 //
-static int decode_incoming(http1_conn_t *conn)
+static int decode_incoming(h1_codec_connection_t *conn)
 {
     struct decoder_t *decoder = &conn->decoder;
     bool more = true;
@@ -1159,20 +1218,18 @@ static int decode_incoming(http1_conn_t *conn)
 }
 
 
-void *http1_connection_get_context(http1_conn_t *conn)
+void *h1_codec_connection_get_context(h1_codec_connection_t *conn)
 {
     return conn->context;
 }
 
 // Push inbound network data into the http1 protocol engine.
 //
-// All of the xfer_rx callback will occur in the context of this call. This
+// All of the hrs_rx callback will occur in the context of this call. This
 // returns zero on success otherwise an error code.  Any error occuring during
-// a callback will be reflected in the return value of this function.  It is
-// expected that the caller will call http1_connection_close on a non-zero
-// return value.
+// a callback will be reflected in the return value of this function.
 //
-int http1_connection_rx_data(http1_conn_t *conn, qd_buffer_list_t *data, size_t len)
+int h1_codec_connection_rx_data(h1_codec_connection_t *conn, qd_buffer_list_t *data, size_t len)
 {
     struct decoder_t *decoder = &conn->decoder;
     bool init_ptrs = DEQ_IS_EMPTY(decoder->incoming);
@@ -1195,79 +1252,117 @@ int http1_connection_rx_data(http1_conn_t *conn, qd_buffer_list_t *data, size_t
     return decode_incoming(conn);
 }
 
-void http1_transfer_set_context(http1_transfer_t *xfer, void *context)
+
+void h1_codec_request_state_set_context(h1_codec_request_state_t *hrs, void *context)
 {
-    xfer->context = context;
+    hrs->context = context;
 }
 
-void *http1_transfer_get_context(const http1_transfer_t *xfer)
+
+void *h1_codec_request_state_get_context(const h1_codec_request_state_t *hrs)
+{
+    return hrs->context;
+}
+
+
+h1_codec_connection_t *h1_codec_request_state_get_connection(const h1_codec_request_state_t *hrs)
+{
+    return hrs->conn;
+}
+
+
+const char *h1_codec_request_state_method(const h1_codec_request_state_t *hrs)
 {
-    return xfer->context;
+    return hrs->method;
 }
 
-http1_conn_t *http1_transfer_get_connection(const http1_transfer_t *xfer)
+
+void h1_codec_request_state_cancel(h1_codec_request_state_t *hrs)
 {
-    return xfer->conn;
+    if (hrs) {
+        h1_codec_connection_t *conn = hrs->conn;
+        if (hrs == conn->decoder.hrs) {
+            decoder_reset(&conn->decoder);
+        }
+        if (hrs == conn->encoder.hrs) {
+            encoder_reset(&conn->encoder);
+        }
+        conn->config.request_complete(hrs, true);
+        h1_codec_request_state_free(hrs);
+    }
 }
 
 
-// initiate a new HTTP request.  This creates a new transfer.
+// initiate a new HTTP request.  This creates a new request state.
 // request = <method>SP<target>SP<version>CRLF
-// Expects version to be in the format "HTTP/X.Y"
 //
-http1_transfer_t *http1_tx_request(http1_conn_t *conn, const char *method, const char *target, const char *version)
+h1_codec_request_state_t *h1_codec_tx_request(h1_codec_connection_t *conn, const char *method, const char *target,
+                                              uint32_t version_major, uint32_t version_minor)
 {
     struct encoder_t *encoder = &conn->encoder;
-    assert(!encoder->xfer);   // error: transfer already in progress
+    assert(!encoder->hrs);   // error: transfer already in progress
     assert(conn->config.type == HTTP1_CONN_SERVER);
 
-    http1_transfer_t *xfer = encoder->xfer = http1_transfer(conn);
+    h1_codec_request_state_t *hrs = encoder->hrs = h1_codec_request_state(conn);
     encoder->is_request = true;
-    encoder->crlf_sent = false;
+    encoder->headers_sent = false;
 
-    if (strcmp((char*)method, "HEAD") == 0)
-        xfer->is_head_method = true;
-    else if (strcmp((char*)method, "CONNECT") == 0)
-        xfer->is_connect_method = true;
+    hrs->method = qd_strdup(method);
+
+    // check for methods that do not support body content in the response:
+    hrs->no_body_method = (strcmp((char*)method, "HEAD") == 0 ||
+                           strcmp((char*)method, "CONNECT") == 0);
 
     write_string(encoder, method);
     write_string(encoder, " ");
     write_string(encoder, target);
     write_string(encoder, " ");
-    write_string(encoder, version);
+    {
+        char version[64];
+        snprintf(version, 64, "HTTP/%"PRIu32".%"PRIu32, version_major, version_minor);
+        write_string(encoder, version);
+    }
     write_string(encoder, CRLF);
 
-    return xfer;
+    return hrs;
 }
 
 
-// Send an HTTP response msg.  xfer must correspond to the "oldest" outstanding
-// request that arrived via the xfer_rx_request callback for this connection.
-// version is expected to be in the form "HTTP/x.y"
+// Send an HTTP response msg.  hrs must correspond to the "oldest" outstanding
+// request that arrived via the hrs_rx_request callback for this connection.
 // status_code is expected to be 100 <= status_code <= 999
 // status-line = HTTP-version SP status-code SP reason-phrase CRLF
 //
-int http1_tx_response(http1_transfer_t *xfer, const char *version, int status_code, const char *reason_phrase)
+int h1_codec_tx_response(h1_codec_request_state_t *hrs, int status_code, const char *reason_phrase,
+                         uint32_t version_major, uint32_t version_minor)
 {
-    http1_conn_t *conn = http1_transfer_get_connection(xfer);
+    h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs);
     struct encoder_t *encoder = &conn->encoder;
 
     assert(conn->config.type == HTTP1_CONN_CLIENT);
-    assert(!encoder->xfer);   // error: transfer already in progress
-    assert(DEQ_HEAD(conn->xfers) == xfer);   // error: response not in order!
-    assert(xfer->response_code == 0);
+    assert(!encoder->hrs);   // error: transfer already in progress
+    assert(DEQ_HEAD(conn->hrs_queue) == hrs);   // error: response not in order!
+    assert(hrs->response_code == 0);
 
-    encoder->xfer = xfer;
+    encoder->hrs = hrs;
     encoder->is_request = false;
-    encoder->crlf_sent = false;
-    xfer->response_code = status_code;
+    encoder->headers_sent = false;
+    hrs->response_code = status_code;
 
-    char code_str[32];
-    snprintf(code_str, 32, "%d", status_code);
+    {
+        char version[64];
+        snprintf(version, 64, "HTTP/%"PRIu32".%"PRIu32, version_major, version_minor);
+        write_string(encoder, version);
+    }
 
-    write_string(encoder, version);
     write_string(encoder, " ");
-    write_string(encoder, code_str);
+
+    {
+        char code_str[32];
+        snprintf(code_str, 32, "%d", status_code);
+        write_string(encoder, code_str);
+    }
+
     if (reason_phrase) {
         write_string(encoder, " ");
         write_string(encoder, reason_phrase);
@@ -1280,17 +1375,26 @@ int http1_tx_response(http1_transfer_t *xfer, const char *version, int status_co
 
 // Add a header field to an outgoing message
 // header-field   = field-name ":" OWS field-value OWS
-int http1_tx_add_header(http1_transfer_t *xfer, const char *key, const char *value)
+int h1_codec_tx_add_header(h1_codec_request_state_t *hrs, const char *key, const char *value)
 {
-    http1_conn_t *conn = http1_transfer_get_connection(xfer);
+    h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs);
     struct encoder_t *encoder = &conn->encoder;
-    assert(encoder->xfer == xfer);  // xfer not current transfer
+    assert(encoder->hrs == hrs);  // hrs not current transfer
 
     write_string(encoder, key);
     write_string(encoder, ": ");
     write_string(encoder, value);
     write_string(encoder, CRLF);
 
+    // determine if the body length is provided. If not
+    // the caller will have to close the connection
+    //
+    if (strcasecmp("Content-Length", (char*) key) == 0) {
+        encoder->hdr_content_length = true;
+    } else if (strcasecmp("Transfer-Encoding", (char *)key) == 0) {
+        encoder->is_chunked = _is_transfer_chunked(value);
+    }
+
     // check to see if there are any full buffers that can be sent.
 
     qd_buffer_list_t blist = DEQ_EMPTY;
@@ -1300,78 +1404,100 @@ int http1_tx_add_header(http1_transfer_t *xfer, const char *key, const char *val
         DEQ_REMOVE_HEAD(encoder->outgoing);
         DEQ_INSERT_TAIL(blist, buf);
         octets += qd_buffer_size(buf);
+        buf = DEQ_HEAD(encoder->outgoing);
     }
     if (!DEQ_IS_EMPTY(blist))
-        conn->config.conn_tx_data(conn, &blist, 0, octets);
+        conn->config.tx_buffers(hrs, &blist, octets);
 
     return 0;
 }
 
 
-// just forward the body chain along
-int http1_tx_body(http1_transfer_t *xfer, qd_buffer_list_t *data, size_t offset, size_t len)
+static inline void _flush_headers(h1_codec_request_state_t *hrs, struct encoder_t *encoder)
 {
-    http1_conn_t *conn = http1_transfer_get_connection(xfer);
-    struct encoder_t *encoder = &conn->encoder;
-
-    fprintf(stderr, "http1_tx_body(offset=%zu size=%zu)\n", offset, len);
-    
-    if (!encoder->crlf_sent) {
+    if (!encoder->headers_sent) {
         // need to terminate any headers by sending the plain CRLF that follows
         // the headers
         write_string(encoder, CRLF);
 
         // flush all pending output.  From this point out the outgoing queue is
         // no longer used for this message
-        fprintf(stderr, "Flushing before body: %u bytes\n", qd_buffer_list_length(&encoder->outgoing));
-        conn->config.conn_tx_data(conn, &encoder->outgoing, 0, qd_buffer_list_length(&encoder->outgoing));
+        hrs->conn->config.tx_buffers(hrs, &encoder->outgoing, qd_buffer_list_length(&encoder->outgoing));
         DEQ_INIT(encoder->outgoing);
         encoder->write_ptr = NULL_I_PTR;
-        encoder->crlf_sent = true;
+        encoder->headers_sent = true;
     }
+}
+
+
+// just forward the body chain along
+int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data)
+{
+    h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs);
+    struct encoder_t *encoder = &conn->encoder;
+
+    if (!encoder->headers_sent)
+        _flush_headers(hrs, encoder);
 
     // skip the outgoing queue and send directly
-    fprintf(stderr, "Sending body data %zu bytes\n", len);
-    conn->config.conn_tx_data(conn, data, offset, len);
+    conn->config.tx_body_data(hrs, body_data);
 
     return 0;
 }
 
 
-int http1_tx_done(http1_transfer_t *xfer)
+int h1_codec_tx_done(h1_codec_request_state_t *hrs, bool *need_close)
 {
-    http1_conn_t *conn = http1_transfer_get_connection(xfer);
+    h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs);
     struct encoder_t *encoder = &conn->encoder;
+    if (need_close)
+        *need_close = false;
 
-    if (!encoder->crlf_sent) {
-        // need to send the plain CRLF that follows the headers
-        write_string(encoder, CRLF);
-
-        // flush all pending output.
-
-        fprintf(stderr, "Flushing at tx_done: %u bytes\n", qd_buffer_list_length(&encoder->outgoing));
-        conn->config.conn_tx_data(conn, &encoder->outgoing, 0, qd_buffer_list_length(&encoder->outgoing));
-        DEQ_INIT(encoder->outgoing);
-        encoder->write_ptr = NULL_I_PTR;
-        encoder->crlf_sent = true;
-    }
+    if (!encoder->headers_sent)
+        _flush_headers(hrs, encoder);
 
     bool is_response = !encoder->is_request;
-    encoder_reset(encoder);
 
     if (is_response) {
-        if (IS_INFO_RESPONSE(xfer->response_code)) {
+        if (IS_INFO_RESPONSE(hrs->response_code)) {
             // this is a non-terminal response. Another response is expected
             // for this request so just reset the transfer state
-            xfer->response_code = 0;
+            hrs->response_code = 0;
         } else {
-            // The message exchange is complete
-            conn->config.xfer_done(xfer);
-            http1_transfer_free(xfer);
+            hrs->response_complete = true;
+            if (need_close) {
+                // if the message body size is not explicit the connection has
+                // to be closed to indicate end of message
+                if (!hrs->no_body_method &&
+                    !NO_BODY_RESPONSE(hrs->response_code) &&
+                    !encoder->is_chunked &&
+                    !encoder->hdr_content_length) {
+
+                    *need_close = true;
+                }
+            }
         }
+    } else {
+        hrs->request_complete = true;
+    }
+
+    encoder_reset(encoder);
+    if (hrs->request_complete && hrs->response_complete) {
+        conn->config.request_complete(hrs, false);
+        h1_codec_request_state_free(hrs);
     }
 
     return 0;
 }
 
 
+bool h1_codec_request_complete(const h1_codec_request_state_t *hrs)
+{
+    return hrs && hrs->request_complete;
+}
+
+
+bool h1_codec_response_complete(const h1_codec_request_state_t *hrs)
+{
+    return hrs && hrs->response_complete;
+}
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
new file mode 100644
index 0000000..5533aee
--- /dev/null
+++ b/src/adaptors/http1/http1_private.h
@@ -0,0 +1,272 @@
+#ifndef http1_private_H
+#define http1_private_H 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+//
+// HTTP/1.x Adaptor Internals
+//
+// Nomenclature:
+//  "in": for information flowing from the endpoint into the router
+//        (from proactor to core)
+//  "out": for information flowing from the router out to the endpoint
+//         (from core to proactor)
+//
+#include <qpid/dispatch/http1_codec.h>
+#include <qpid/dispatch/protocol_adaptor.h>
+#include <qpid/dispatch/message.h>
+#include "adaptors/http_common.h"
+
+typedef struct qdr_http1_out_data_t      qdr_http1_out_data_t;
+typedef struct qdr_http1_out_data_fifo_t qdr_http1_out_data_fifo_t;
+typedef struct qdr_http1_request_base_t  qdr_http1_request_base_t;
+typedef struct qdr_http1_connection_t    qdr_http1_connection_t;
+
+DEQ_DECLARE(qdr_http1_connection_t, qdr_http1_connection_list_t);
+
+
+typedef struct qdr_http1_adaptor_t {
+    qdr_core_t                  *core;
+    qdr_protocol_adaptor_t      *adaptor;
+    qd_log_source_t             *log;
+    sys_mutex_t                 *lock;  // for the lists
+    qd_http_lsnr_list_t          listeners;
+    qd_http_connector_list_t     connectors;
+    qdr_http1_connection_list_t  connections;
+} qdr_http1_adaptor_t;
+
+extern qdr_http1_adaptor_t *qdr_http1_adaptor;
+
+
+// Data to be written out the raw connection.
+//
+// This adaptor has to cope with two different data sources: the HTTP1 encoder
+// and the qd_message_body_data_t list.  The HTTP1 encoder produces a simple
+// qd_buffer_list_t for outgoing header data whose ownership is given to the
+// adaptor: the adaptor is free to deque/free these buffers as needed.  The
+// qd_message_body_data_t buffers are shared with the owning message and the
+// buffer list must not be modified by the adaptor.  The qdr_http1_out_data_t
+// is used to manage both types of data sources.
+//
+struct qdr_http1_out_data_t {
+    DEQ_LINKS(qdr_http1_out_data_t);
+
+    qdr_http1_out_data_fifo_t *owning_fifo;
+
+    // data is either in a raw buffer chain
+    // or a message body data (not both!)
+
+    qd_buffer_list_t raw_buffers;
+    qd_message_body_data_t *body_data;
+
+    int buffer_count;  // # total buffers
+    int next_buffer;   // offset to next buffer to send
+    int free_count;    // # buffers returned from proton
+};
+ALLOC_DECLARE(qdr_http1_out_data_t);
+DEQ_DECLARE(qdr_http1_out_data_t, qdr_http1_out_data_list_t);
+
+
+//
+// A fifo of outgoing (raw connection) data, oldest at HEAD.
+//
+// write_ptr tracks the point in the fifo where the current out_data node that
+// is being written to the raw connection.  As the raw connection returns
+// written buffers (PN_RAW_CONNECTION_WRITTEN) the are removed from the HEAD
+// and freed.
+//
+struct qdr_http1_out_data_fifo_t {
+    qdr_http1_out_data_list_t fifo;
+    qdr_http1_out_data_t     *write_ptr;
+};
+
+
+// Per HTTP request/response(s) state.
+//
+// This base class is extended for client and server-specific state, see
+// http1_client.c and http1_server.c A reference is stored in
+// qdr_delivery_get_context(dlv)
+//
+struct qdr_http1_request_base_t {
+    DEQ_LINKS(qdr_http1_request_base_t);
+
+    uint64_t                  msg_id;
+    h1_codec_request_state_t *lib_rs;
+    qdr_http1_connection_t   *hconn;  // parent connection
+    char                     *response_addr; // request reply-to
+
+    // statistics
+    //
+    uint64_t  in_http1_octets;    // read from raw conn
+    uint64_t  out_http1_octets;   // written to raw conn
+};
+DEQ_DECLARE(qdr_http1_request_base_t, qdr_http1_request_list_t);
+
+
+// A single HTTP adaptor connection.
+//
+struct qdr_http1_connection_t {
+    DEQ_LINKS(qdr_http1_connection_t);
+    qd_server_t           *qd_server;
+    h1_codec_connection_t *http_conn;
+    pn_raw_connection_t   *raw_conn;
+    qdr_connection_t      *qdr_conn;
+    qdr_http1_adaptor_t   *adaptor;
+
+    uint64_t               conn_id;
+    qd_handler_context_t   handler_context;
+    h1_codec_connection_type_t     type;
+
+    struct {
+        char *host;
+        char *port;
+        char *address;
+        char *host_port;
+    } cfg;
+
+    // State if connected to an HTTP client
+    //
+    struct {
+        char *client_ip_addr;
+        char *reply_to_addr;   // set once link is up
+        uint64_t next_msg_id;
+    } client;
+
+    // State if connected to an HTTP server
+    struct {
+        qd_timer_t *reconnect_timer;
+        int         reconnect_count;
+    } server;
+
+    // Outgoing link (router ==> HTTP app)
+    //
+    qdr_link_t            *out_link;
+    uint64_t               out_link_id;
+    int                    out_link_credit;  // provided by adaptor
+
+    // Incoming link (HTTP app ==> router)
+    //
+    qdr_link_t            *in_link;
+    uint64_t               in_link_id;
+    int                    in_link_credit;  // provided by router
+
+    // Oldest at HEAD
+    //
+    qdr_http1_request_list_t requests;
+
+    // statistics
+    //
+    uint64_t  in_http1_octets;
+    uint64_t  out_http1_octets;
+
+    // flags
+    //
+    bool trace;
+    bool close_connection;
+};
+ALLOC_DECLARE(qdr_http1_connection_t);
+
+// special AMQP application properties keys for HTTP1 metadata headers
+//
+#define HTTP1_HEADER_PREFIX  "http:"          // reserved prefix
+#define REQUEST_HEADER_KEY   "http:request"   // request msg, value=version
+#define RESPONSE_HEADER_KEY  "http:response"  // response msg, value=version
+#define REASON_HEADER_KEY    "http:reason"    // from response (optional)
+#define TARGET_HEADER_KEY    "http:target"    // request target
+#define STATUS_HEADER_KEY    "http:status"    // response status (integer)
+
+
+// http1_adaptor.c
+//
+//int qdr_http1_write_out_data(qdr_http1_connection_t *hconn);
+//void qdr_http1_write_buffer_list(qdr_http1_request_t *hreq, qd_buffer_list_t *blist);
+
+void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn);
+void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_list_t *blist);
+void qdr_http1_enqueue_body_data(qdr_http1_out_data_fifo_t *fifo, qd_message_body_data_t *body_data);
+uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_fifo_t *fifo);
+void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data);
+// return the number of buffers currently held by the proactor for writing
+int qdr_http1_out_data_buffers_outstanding(const qdr_http1_out_data_fifo_t *out_data);
+
+void qdr_http1_close_connection(qdr_http1_connection_t *hconn, const char *error);
+void qdr_http1_connection_free(qdr_http1_connection_t *hconn);
+
+void qdr_http1_request_base_cleanup(qdr_http1_request_base_t *hreq);
+void qdr_http1_error_response(qdr_http1_request_base_t *hreq,
+                              int error_code,
+                              const char *reason);
+void qdr_http1_rejected_response(qdr_http1_request_base_t *hreq,
+                                 const qdr_error_t *error);
+
+
+// return the next HTTP token in a comma separated list of tokens
+//
+// start - search for token start pointer
+// len - length of token if non-null returned
+// next - address of start of next token
+//
+const char *qdr_http1_token_list_next(const char *start, size_t *len, const char **next);
+
+
+
+// http1_client.c protocol adaptor callbacks
+//
+void qdr_http1_client_core_link_flow(qdr_http1_adaptor_t    *adaptor,
+                                     qdr_http1_connection_t *hconn,
+                                     qdr_link_t             *link,
+                                     int                     credit);
+uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
+                                            qdr_http1_connection_t *hconn,
+                                            qdr_link_t             *link,
+                                            qdr_delivery_t         *delivery,
+                                            bool                    settled);
+void qdr_http1_client_core_second_attach(qdr_http1_adaptor_t    *adaptor,
+                                         qdr_http1_connection_t *hconn,
+                                         qdr_link_t             *link,
+                                         qdr_terminus_t         *source,
+                                         qdr_terminus_t         *target);
+void qdr_http1_client_core_delivery_update(qdr_http1_adaptor_t      *adaptor,
+                                           qdr_http1_connection_t   *hconn,
+                                           qdr_http1_request_base_t *hreqb,
+                                           qdr_delivery_t           *dlv,
+                                           uint64_t                  disp,
+                                           bool                      settled);
+
+
+// http1_server.c protocol adaptor callbacks
+//
+void qdr_http1_server_core_link_flow(qdr_http1_adaptor_t    *adaptor,
+                                     qdr_http1_connection_t *hconn,
+                                     qdr_link_t             *link,
+                                     int                     credit);
+uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
+                                            qdr_http1_connection_t *hconn,
+                                            qdr_link_t             *link,
+                                            qdr_delivery_t         *delivery,
+                                            bool                    settled);
+void qdr_http1_server_core_delivery_update(qdr_http1_adaptor_t      *adaptor,
+                                           qdr_http1_connection_t   *hconn,
+                                           qdr_http1_request_base_t *hreq,
+                                           qdr_delivery_t           *dlv,
+                                           uint64_t                  disp,
+                                           bool                      settled);
+
+#endif // http1_private_H
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
new file mode 100644
index 0000000..596f32b
--- /dev/null
+++ b/src/adaptors/http1/http1_server.c
@@ -0,0 +1,1430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "http1_private.h"
+#include "adaptors/adaptor_utils.h"
+
+#include <proton/proactor.h>
+
+//
+// This file contains code specific to HTTP server processing.  The raw
+// connection is terminated at an HTTP server, not an HTTP client.
+//
+
+// for debug: dump raw buffers to stdout if true
+#define HTTP1_DUMP_BUFFERS false
+
+
+//
+// State for a single response message arriving via the raw connection.  This
+// message will be decoded into a single AMQP message and forwarded into the
+// core.
+//
+// This object is instantiated when the HTTP1 codec indicates the arrival of a
+// response message (See _server_rx_response_cb()).  The response is considered
+// "complete" after it has been fully encoded and delivered to the core.  The
+// _server_response_msg_t is freed at this point - we do not wait for dispo or
+// settlement from the core since we cannot do anything meaningful should the
+// delivery fail (other than log it).
+//
+typedef struct _server_response_msg_t {
+    DEQ_LINKS(struct _server_response_msg_t);
+
+    struct _server_request_t *hreq; // owning request
+
+    qd_message_t        *msg;       // hold incoming message
+    qd_composed_field_t *msg_props; // hold incoming headers
+    qdr_delivery_t      *dlv;       // inbound to router (qdr_link_deliver)
+    bool                 rx_complete; // response rx complete
+} _server_response_msg_t;
+ALLOC_DECLARE(_server_response_msg_t);
+ALLOC_DEFINE(_server_response_msg_t);
+DEQ_DECLARE(_server_response_msg_t, _server_response_msg_list_t);
+
+
+//
+// State for an HTTP/1.x Request+Response exchange, server facing
+//
+typedef struct _server_request_t {
+    qdr_http1_request_base_t   base;
+
+    // The request arrives via the router core in an AMQP message
+    // (qd_message_t).  These fields are used to encode the response and send
+    // it out the raw connection.
+    //
+    qdr_delivery_t *request_dlv;     // outbound from core_link_deliver
+    uint64_t        request_dispo;   // set by adaptor during encode
+    bool            request_settled; // set by adaptor
+    bool            request_acked;   // true if dispo sent to core
+    bool            request_encoded; // true when encoding done
+    bool            headers_encoded; // True when header encode done
+
+    qdr_http1_out_data_fifo_t out_data;  // encoded request written to raw conn
+
+    _server_response_msg_list_t responses;  // response(s) to this request
+
+    bool codec_completed;     // Request and Response HTTP msgs OK
+    bool cancelled;
+    bool close_on_complete;   // close the conn when this request is complete
+} _server_request_t;
+ALLOC_DECLARE(_server_request_t);
+ALLOC_DEFINE(_server_request_t);
+
+
+//
+// This file contains code specific to HTTP server processing.  The raw
+// connection is terminated at an HTTP server, not an HTTP client.
+//
+
+
+#define DEFAULT_CAPACITY 250
+#define RETRY_PAUSE_MSEC 500
+#define MAX_RECONNECT    5  // 5 * 500 = 2.5 sec
+
+static void _server_tx_buffers_cb(h1_codec_request_state_t *lib_hrs, qd_buffer_list_t *blist, unsigned int len);
+static void _server_tx_body_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_body_data_t *body_data);
+static int  _server_rx_request_cb(h1_codec_request_state_t *hrs,
+                                  const char *method,
+                                  const char *target,
+                                  uint32_t version_major,
+                                  uint32_t version_minor);
+static int  _server_rx_response_cb(h1_codec_request_state_t *hrs,
+                                   int status_code,
+                                   const char *reason_phrase,
+                                   uint32_t version_major,
+                                   uint32_t version_minor);
+static int _server_rx_header_cb(h1_codec_request_state_t *hrs, const char *key, const char *value);
+static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_body);
+static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t offset, size_t len,
+                              bool more);
+static void _server_rx_done_cb(h1_codec_request_state_t *hrs);
+static void _server_request_complete_cb(h1_codec_request_state_t *hrs, bool cancelled);
+static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context);
+static void _do_reconnect(void *context);
+static void _server_response_msg_free(_server_request_t *req, _server_response_msg_t *rmsg);
+static void _server_request_free(_server_request_t *req);
+static void _server_connection_free(qdr_http1_connection_t *hconn);
+static void _write_pending_request(_server_request_t *req);
+static void _cancel_request(_server_request_t *req);
+
+
+////////////////////////////////////////////////////////
+// HTTP/1.x Server Connector
+////////////////////////////////////////////////////////
+
+
+// An HttpConnector has been created.  Create an qdr_http_connection_t for it.
+// Do not create a raw connection - this is done on demand when the router
+// sends a delivery over the connector.
+//
+static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ctor,
+                                                         qd_dispatch_t *qd,
+                                                         const qd_http_bridge_config_t *bconfig)
+{
+    qdr_http1_connection_t *hconn = new_qdr_http1_connection_t();
+
+    ZERO(hconn);
+    hconn->type = HTTP1_CONN_SERVER;
+    hconn->qd_server = qd->server;
+    hconn->adaptor = qdr_http1_adaptor;
+    hconn->handler_context.handler = &_handle_connection_events;
+    hconn->handler_context.context = hconn;
+    hconn->cfg.host = qd_strdup(bconfig->host);
+    hconn->cfg.port = qd_strdup(bconfig->port);
+    hconn->cfg.address = qd_strdup(bconfig->address);
+    hconn->cfg.host_port = qd_strdup(bconfig->host_port);
+
+    // for initiating a connection to the server
+    hconn->server.reconnect_timer = qd_timer(qdr_http1_adaptor->core->qd, _do_reconnect, hconn);
+
+    // Create the qdr_connection
+
+    qdr_connection_info_t *info = qdr_connection_info(false, //bool             is_encrypted,
+                                                      false, //bool             is_authenticated,
+                                                      true,  //bool             opened,
+                                                      "",   //char            *sasl_mechanisms,
+                                                      QD_OUTGOING, //qd_direction_t   dir,
+                                                      hconn->cfg.host_port,    //const char      *host,
+                                                      "",    //const char      *ssl_proto,
+                                                      "",    //const char      *ssl_cipher,
+                                                      "",    //const char      *user,
+                                                      "HTTP/1.x Adaptor",    //const char      *container,
+                                                      pn_data(0),     //pn_data_t       *connection_properties,
+                                                      0,     //int              ssl_ssf,
+                                                      false, //bool             ssl,
+                                                      // set if remote is a qdrouter
+                                                      0);    //const qdr_router_version_t *version)
+
+    hconn->conn_id = qd_server_allocate_connection_id(hconn->qd_server);
+    hconn->qdr_conn = qdr_connection_opened(qdr_http1_adaptor->core,
+                                            qdr_http1_adaptor->adaptor,
+                                            false,  // incoming
+                                            QDR_ROLE_NORMAL,
+                                            1,      // cost
+                                            hconn->conn_id,
+                                            0,  // label
+                                            0,  // remote container id
+                                            false,  // strip annotations in
+                                            false,  // strip annotations out
+                                            false,  // allow dynamic link routes
+                                            false,  // allow admin status update
+                                            DEFAULT_CAPACITY,
+                                            0,      // vhost
+                                            info,
+                                            0,      // bind context
+                                            0);     // bind token
+    qdr_connection_set_context(hconn->qdr_conn, hconn);
+
+    qd_log(hconn->adaptor->log, QD_LOG_DEBUG, "[C%i] HTTP connection to server created", hconn->conn_id);
+
+    // wait for the raw connection to come up before creating the in and out links
+
+    hconn->raw_conn = pn_raw_connection();
+    pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
+
+    sys_mutex_lock(qdr_http1_adaptor->lock);
+    DEQ_INSERT_TAIL(qdr_http1_adaptor->connections, hconn);
+    sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+    return hconn;
+}
+
+
+// Management Agent API - Create
+//
+qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
+{
+    qd_http_connector_t *c = qd_http_connector(qd->server);
+    if (!c) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR, "Unable to create http connector: no memory");
+        return 0;
+    }
+
+    DEQ_ITEM_INIT(c);
+    qdr_http1_connection_t *hconn = _create_server_connection(c, qd, config);
+    if (hconn) {
+        sys_mutex_lock(qdr_http1_adaptor->lock);
+        DEQ_INSERT_TAIL(qdr_http1_adaptor->connectors, c);
+        sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+        // activate the raw connection. This connection may be scheduled on
+        // another thread by this call:
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"] Initiating connection to HTTP server %s",
+               hconn->conn_id, hconn->cfg.host_port);
+        pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
+        return c;
+    } else {
+        qd_http_connector_decref(c);
+        c = 0;
+    }
+
+    return c;
+}
+
+
+// Management Agent API - Delete
+//
+void qd_http1_delete_connector(qd_dispatch_t *ignored, qd_http_connector_t *ct)
+{
+    if (ct) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Deleted HttpConnector for %s, %s:%s", ct->config.address, ct->config.host, ct->config.port);
+
+        sys_mutex_lock(qdr_http1_adaptor->lock);
+        DEQ_REMOVE(qdr_http1_adaptor->connectors, ct);
+        sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+        qd_http_connector_decref(ct);
+
+        // TODO(kgiusti): do we now close all related connections?
+    }
+}
+
+
+
+
+////////////////////////////////////////////////////////
+// Raw Connector Events
+////////////////////////////////////////////////////////
+
+
+// Create the qdr links and HTTP codec when the server connection comes up.
+// These links & codec will persist across temporary drops in the connection to
+// the server (like when closing the connection to indicate end of response
+// message).  However if the connection to the server cannot be re-established
+// in a "reasonable" amount of time we consider the server unavailable and
+// these links and codec will be closed - aborting any pending requests.  Once
+// the connection to the server is reestablished these links & codec will be
+// recreated.
+//
+static void _setup_server_links(qdr_http1_connection_t *hconn)
+{
+    if (!hconn->in_link) {
+        // simulate an anonymous link for responses from the server
+        hconn->in_link = qdr_link_first_attach(hconn->qdr_conn,
+                                               QD_INCOMING,
+                                               qdr_terminus(0),  //qdr_terminus_t   *source,
+                                               qdr_terminus(0),  //qdr_terminus_t   *target
+                                               "http1.server.in",  //const char       *name,
+                                               0,                //const char       *terminus_addr,
+                                               false,
+                                               NULL,
+                                               &(hconn->in_link_id));
+        qdr_link_set_context(hconn->in_link, hconn);
+
+        qd_log(hconn->adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] HTTP server response link created",
+               hconn->conn_id, hconn->in_link_id);
+    }
+
+    if (!hconn->out_link) {
+        // simulate a server subscription for its service address
+        qdr_terminus_t *source = qdr_terminus(0);
+        qdr_terminus_set_address(source, hconn->cfg.address);
+        hconn->out_link = qdr_link_first_attach(hconn->qdr_conn,
+                                                QD_OUTGOING,
+                                                source,           //qdr_terminus_t   *source,
+                                                qdr_terminus(0),  //qdr_terminus_t   *target,
+                                                "http1.server.out", //const char       *name,
+                                                0,                //const char       *terminus_addr,
+                                                false,
+                                                0,      // initial delivery
+                                                &(hconn->out_link_id));
+        qdr_link_set_context(hconn->out_link, hconn);
+
+        hconn->out_link_credit = DEFAULT_CAPACITY;
+        qdr_link_flow(hconn->adaptor->core, hconn->out_link, DEFAULT_CAPACITY, false);
+
+        qd_log(hconn->adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] HTTP server request link created",
+               hconn->conn_id, hconn->out_link_id);
+    }
+
+    if (!hconn->http_conn) {
+        h1_codec_config_t config = {0};
+        config.type             = HTTP1_CONN_SERVER;
+        config.tx_buffers       = _server_tx_buffers_cb;
+        config.tx_body_data     = _server_tx_body_data_cb;
+        config.rx_request       = _server_rx_request_cb;
+        config.rx_response      = _server_rx_response_cb;
+        config.rx_header        = _server_rx_header_cb;
+        config.rx_headers_done  = _server_rx_headers_done_cb;
+        config.rx_body          = _server_rx_body_cb;
+        config.rx_done          = _server_rx_done_cb;
+        config.request_complete = _server_request_complete_cb;
+        hconn->http_conn = h1_codec_connection(&config, hconn);
+    }
+}
+
+
+// Tear down the qdr links and the codec.  This is called when the
+// connection to the server has dropped and cannot be re-established in a
+// timely manner.
+//
+static void _teardown_server_links(qdr_http1_connection_t *hconn)
+{
+    // @TODO(kgiusti): should we PN_RELEASE all unsent outbound deliveries first?
+    _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+    while (hreq) {
+        _server_request_free(hreq);
+        hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+    }
+    h1_codec_connection_free(hconn->http_conn);
+    hconn->http_conn = 0;
+
+    if (hconn->out_link) {
+        qdr_link_set_context(hconn->out_link, 0);
+        qdr_link_detach(hconn->out_link, QD_CLOSED, 0);
+        hconn->out_link = 0;
+    }
+
+    if (hconn->in_link) {
+        qdr_link_set_context(hconn->in_link, 0);
+        qdr_link_detach(hconn->in_link, QD_CLOSED, 0);
+        hconn->in_link = 0;
+    }
+}
+
+
+// This adapter attempts to keep the connection to the server up as long as the
+// connector is configured.  This is called via a timer scheduled when the
+// PN_CONNECTION_CLOSE event is handled.
+//
+static void _do_reconnect(void *context)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
+    if (!hconn->raw_conn) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] Connecting to HTTP server...", hconn->conn_id);
+        hconn->raw_conn = pn_raw_connection();
+        pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
+        // this call may reschedule the connection on another I/O thread:
+        pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
+    }
+}
+
+
+// Proton Raw Connection Events
+//
+static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context)
+{
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
+    qd_log_source_t *log = qdr_http1_adaptor->log;
+
+    qd_log(log, QD_LOG_DEBUG, "RAW CONNECTION EVENT %s\n", pn_event_type_name(pn_event_type(e)));
+
+    if (!hconn) return;
+
+    switch (pn_event_type(e)) {
+
+    case PN_RAW_CONNECTION_CONNECTED: {
+        hconn->server.reconnect_count = 0;
+        _setup_server_links(hconn);
+        while (qdr_connection_process(hconn->qdr_conn)) {}
+        break;
+    }
+    case PN_RAW_CONNECTION_CLOSED_READ: {
+        // notify the codec so it can complete the current response
+        // message (response body terminated on connection closed)
+        h1_codec_connection_closed(hconn->http_conn);
+    }
+    // fall through
+    case PN_RAW_CONNECTION_CLOSED_WRITE: {
+        qd_log(log, QD_LOG_DEBUG, "[C%i] Closed for %s", hconn->conn_id,
+               pn_event_type(e) == PN_RAW_CONNECTION_CLOSED_READ
+               ? "reading" : "writing");
+        pn_raw_connection_close(hconn->raw_conn);
+        break;
+    }
+    case PN_RAW_CONNECTION_DISCONNECTED: {
+        pn_raw_connection_set_context(hconn->raw_conn, 0);
+        hconn->raw_conn = 0;
+        hconn->close_connection = false;
+
+        if (!hconn->qdr_conn) {
+            // the router has closed this connection so do not try to
+            // re-establish it
+            qd_log(log, QD_LOG_INFO, "[C%i] Connection closed", hconn->conn_id);
+            _server_connection_free(hconn);
+            return;
+        }
+
+        // if the current request was not completed, cancel it.  it's ok if
+        // there are outstanding *response* deliveries in flight as long as the
+        // response(s) have been completely received from the server
+        // (request_complete == true).
+
+        _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+        if (hreq && !hreq->codec_completed && hreq->base.out_http1_octets > 0) {
+            _cancel_request(hreq);
+        }
+
+        // reconnect to the server. Leave the links intact so pending requests
+        // are not aborted.  Once we've failed to reconnect after MAX_RECONNECT
+        // tries drop the links to prevent additional request from arriving.
+        //
+        qd_duration_t nap_time = RETRY_PAUSE_MSEC * hconn->server.reconnect_count;
+        if (hconn->server.reconnect_count == MAX_RECONNECT) {
+            qd_log(log, QD_LOG_INFO, "[C%i] Server not responding - disconnecting...", hconn->conn_id);
+            _teardown_server_links(hconn);
+        } else {
+            hconn->server.reconnect_count += 1;  // increase next sleep interval
+        }
+        qd_timer_schedule(hconn->server.reconnect_timer, nap_time);
+        return;
+    }
+    case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
+        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need write buffers", hconn->conn_id);
+        _write_pending_request((_server_request_t*) DEQ_HEAD(hconn->requests));
+        break;
+    }
+    case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
+        qd_log(log, QD_LOG_DEBUG, "[C%i] Need read buffers", hconn->conn_id);
+        // @TODO(kgiusti): backpressure if no credit
+        // if (hconn->in_link_credit > 0 */)
+        if (!hconn->close_connection) {
+            int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
+                   hconn->conn_id, granted);
+        }
+        break;
+    }
+    case PN_RAW_CONNECTION_WAKE: {
+        qd_log(log, QD_LOG_DEBUG, "[C%i] Wake-up", hconn->conn_id);
+        while (qdr_connection_process(hconn->qdr_conn)) {}
+        qd_log(log, QD_LOG_DEBUG, "[C%i] Connection processing complete", hconn->conn_id);
+        break;
+    }
+    case PN_RAW_CONNECTION_READ: {
+        qd_buffer_list_t blist;
+        uintmax_t length;
+        qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
+
+        if (HTTP1_DUMP_BUFFERS) {
+            fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total octets\n", length);
+            qd_buffer_t *bb = DEQ_HEAD(blist);
+            while (bb) {
+                fprintf(stdout, "  buffer='%.*s'\n", (int)qd_buffer_size(bb), (char*)&bb[1]);
+                bb = DEQ_NEXT(bb);
+            }
+            fflush(stdout);
+        }
+
+        if (length) {
+            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from server",
+                   hconn->conn_id, hconn->in_link_id, length);
+            hconn->in_http1_octets += length;
+            int error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
+            if (error)
+                qdr_http1_close_connection(hconn, "Incoming response message failed to parse");
+        }
+        break;
+    }
+    case PN_RAW_CONNECTION_WRITTEN: {
+        qdr_http1_free_written_buffers(hconn);
+        break;
+    }
+    default:
+        break;
+    }
+
+    // remove me:
+    if (hconn) {
+        _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+        if (hreq) {
+            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP is server request complete????", hconn->conn_id);
+            qd_log(log, QD_LOG_DEBUG, "   codec_completed=%s cancelled=%s",
+                   hreq->codec_completed ? "Complete" : "Not Complete",
+                   hreq->cancelled ? "Cancelled" : "Not Cancelled");
+            qd_log(log, QD_LOG_DEBUG, "   Req: dlv=%p dispo=%"PRIu64" settled=%d acked=%d",
+                   (void*) hreq->request_dlv, hreq->request_dispo, hreq->request_settled,
+                   hreq->request_acked);
+            qd_log(log, QD_LOG_DEBUG, "   Req: out_data=%d pton=%d resp-count=%d",
+                   (int) DEQ_SIZE(hreq->out_data.fifo),
+                   qdr_http1_out_data_buffers_outstanding(&hreq->out_data),
+                   (int) DEQ_SIZE(hreq->responses));
+        }
+    }
+
+    // Check for completed or cancelled requests
+
+    bool need_close = false;
+    _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+    if (hreq) {
+
+        if (hreq->cancelled) {
+
+            // request:  have to wait until all buffers returned from proton
+            // before we can release the request delivery...
+            if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
+                return;
+
+            if (hreq->request_dlv) {
+                // let the message drain... (TODO@(kgiusti) is this necessary?
+                if (!qdr_delivery_receive_complete(hreq->request_dlv))
+                    return;
+
+                uint64_t dispo = hreq->request_dispo || PN_MODIFIED;
+                qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+                                                  hreq->request_dlv,
+                                                  dispo,
+                                                  true,   // settled
+                                                  0,      // error
+                                                  0,      // dispo data
+                                                  false);
+                qdr_delivery_set_context(hreq->request_dlv, 0);
+                qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request cancelled");
+                hreq->request_dlv = 0;
+            }
+
+            _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+            while (rmsg) {
+                if (rmsg->dlv) {
+                    qd_message_set_receive_complete(qdr_delivery_message(rmsg->dlv));
+                    qdr_delivery_set_aborted(rmsg->dlv, true);
+                }
+                _server_response_msg_free(hreq, rmsg);
+                rmsg = DEQ_HEAD(hreq->responses);
+            }
+
+            // The state of the connection to the server will be unknown if
+            // this request was not completed.
+            if (!hreq->codec_completed && hreq->base.out_http1_octets > 0)
+                need_close = true;
+
+            _server_request_free(hreq);
+
+        } else {
+
+            // Can the request disposition be updated?  Disposition can be
+            // updated after the entire encoded request has been written to the
+            // server.
+            if (!hreq->request_acked &&
+                hreq->request_encoded &&
+                DEQ_SIZE(hreq->out_data.fifo) == 0) {
+
+                qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+                                                  hreq->request_dlv,
+                                                  hreq->request_dispo,
+                                                  false,   // settled
+                                                  0,      // error
+                                                  0,      // dispo data
+                                                  false);
+                hreq->request_acked = true;
+            }
+
+            // Can we settle request?  Settle the request delivery after all
+            // response messages have been received from the server
+            // (codec_complete).  Note that the responses may not have finished
+            // being delivered to the core (lack of credit, etc.)
+            //
+            if (!hreq->request_settled &&
+                hreq->request_acked &&  // implies out_data done
+                hreq->codec_completed) {
+
+                qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+                                                  hreq->request_dlv,
+                                                  hreq->request_dispo,
+                                                  true,   // settled
+                                                  0,      // error
+                                                  0,      // dispo data
+                                                  false);
+                // can now release the delivery
+                qdr_delivery_set_context(hreq->request_dlv, 0);
+                qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request settled");
+                hreq->request_dlv = 0;
+
+                hreq->request_settled = true;
+            }
+
+            // Has the entire request/response completed?  It is complete after
+            // the request message has been settled and all responses have been
+            // delivered to the core.
+            //
+            if (hreq->request_acked &&
+                hreq->request_settled &&
+                DEQ_SIZE(hreq->responses) == 0) {
+
+                qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request completed!", hconn->conn_id);
+                _server_request_free(hreq);
+
+                hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+                if (hreq)
+                    _write_pending_request(hreq);
+            }
+        }
+    }
+
+    if (need_close) {
+        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closing connection!", hconn->conn_id);
+        qdr_http1_close_connection(hconn, "Request cancelled");
+    }
+}
+
+
+//////////////////////////////////////////////////////////////////////
+// HTTP/1.x Encoder/Decoder Callbacks
+//////////////////////////////////////////////////////////////////////
+
+
+// Encoder has a buffer list to send to the server
+//
+static void _server_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *blist, unsigned int len)
+{
+    _server_request_t       *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] Sending %u octets to server",
+           hconn->conn_id, hconn->out_link_id, len);
+    qdr_http1_enqueue_buffer_list(&hreq->out_data, blist);
+    if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests)) {
+        _write_pending_request(hreq);
+    }
+}
+
+
+// Encoder has body data to send to the server
+//
+static void _server_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data)
+{
+    _server_request_t       *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] Sending body data to server",
+           hconn->conn_id, hconn->out_link_id);
+    qdr_http1_enqueue_body_data(&hreq->out_data, body_data);
+    if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests) && hconn->raw_conn) {
+        _write_pending_request(hreq);
+    }
+}
+
+
+// Server will not be sending us HTTP requests
+//
+static int _server_rx_request_cb(h1_codec_request_state_t *hrs,
+                                 const char *method,
+                                 const char *target,
+                                 uint32_t version_major,
+                                 uint32_t version_minor)
+{
+    _server_request_t       *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR,
+           "[C%"PRIu64"][L%"PRIu64"] Spurious HTTP request received from server",
+           hconn->conn_id, hconn->in_link_id);
+    return HTTP1_STATUS_BAD_REQ;
+}
+
+
+// called when decoding an HTTP response from the server.
+//
+static int _server_rx_response_cb(h1_codec_request_state_t *hrs,
+                                  int status_code,
+                                  const char *reason_phrase,
+                                  uint32_t version_major,
+                                  uint32_t version_minor)
+{
+    _server_request_t       *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+    // expected to be in-order
+    assert(hreq && hreq == (_server_request_t*) DEQ_HEAD(hconn->requests));
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] HTTP response received: status=%d phrase=%s version=%"PRIi32".%"PRIi32,
+           hconn->conn_id, hconn->in_link_id, status_code, reason_phrase ? reason_phrase : "<NONE>",
+           version_major, version_minor);
+
+    _server_response_msg_t *rmsg = new__server_response_msg_t();
+    ZERO(rmsg);
+    rmsg->hreq = hreq;
+    DEQ_INSERT_TAIL(hreq->responses, rmsg);
+
+    rmsg->msg_props = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
+    qd_compose_start_map(rmsg->msg_props);
+    {
+        char version[64];
+        snprintf(version, 64, "%"PRIi32".%"PRIi32, version_major, version_minor);
+        qd_compose_insert_symbol(rmsg->msg_props, RESPONSE_HEADER_KEY);
+        qd_compose_insert_string(rmsg->msg_props, version);
+
+        qd_compose_insert_symbol(rmsg->msg_props, STATUS_HEADER_KEY);
+        qd_compose_insert_int(rmsg->msg_props, (int32_t)status_code);
+
+        if (reason_phrase) {
+            qd_compose_insert_symbol(rmsg->msg_props, REASON_HEADER_KEY);
+            qd_compose_insert_string(rmsg->msg_props, reason_phrase);
+        }
+    }
+
+    return 0;
+}
+
+
+// called for each decoded HTTP header.
+//
+static int _server_rx_header_cb(h1_codec_request_state_t *hrs, const char *key, const char *value)
+{
+    _server_request_t       *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"]L%"PRIu64"] HTTP response header received: key='%s' value='%s'",
+           hconn->conn_id, hconn->in_link_id, key, value);
+
+    // expect: running incoming request at tail
+    _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
+    assert(rmsg);
+
+    // We need to filter the connection header out
+    // @TODO(kgiusti): also have to remove headers given in value!
+    if (strcasecmp(key, "connection") != 0) {
+        qd_compose_insert_symbol(rmsg->msg_props, key);
+        qd_compose_insert_string(rmsg->msg_props, value);
+    }
+
+    return 0;
+}
+
+
+// called after the last header is decoded, before decoding any body data.
+//
+static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_body)
+{
+    _server_request_t       *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64" HTTP response headers done.",
+           hconn->conn_id, hconn->in_link_id);
+
+    // expect: running incoming request at tail
+    _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
+    assert(rmsg && !rmsg->msg);
+
+    // start building the AMQP message
+
+    rmsg->msg = qd_message();
+
+    qd_composed_field_t *hdrs = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+    qd_compose_start_list(hdrs);
+    qd_compose_insert_bool(hdrs, 0);     // durable
+    qd_compose_insert_null(hdrs);        // priority
+    //qd_compose_insert_null(hdrs);        // ttl
+    //qd_compose_insert_bool(hdrs, 0);     // first-acquirer
+    //qd_compose_insert_uint(hdrs, 0);     // delivery-count
+    qd_compose_end_list(hdrs);
+
+    qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, hdrs);
+    qd_compose_start_list(props);
+    qd_compose_insert_null(props);     // message-id
+    qd_compose_insert_null(props);     // user-id
+    qd_compose_insert_string(props, hreq->base.response_addr); // to
+    // subject:
+    qd_compose_insert_string(props, h1_codec_request_state_method(hrs));
+    qd_compose_insert_null(props);   // reply-to
+    qd_compose_insert_ulong(props, hreq->base.msg_id);  // correlation-id
+    qd_compose_end_list(props);
+
+    qd_compose_end_map(rmsg->msg_props);
+
+    if (!has_body) {
+        // @TODO(kgiusti): fixme: tack on an empty body data performative.  The
+        // message decoder will barf otherwise
+        qd_buffer_list_t empty = DEQ_EMPTY;
+        rmsg->msg_props = qd_compose(QD_PERFORMATIVE_BODY_DATA, rmsg->msg_props);
+        qd_compose_insert_binary_buffers(rmsg->msg_props, &empty);
+    }
+
+    qd_message_compose_3(rmsg->msg, props, rmsg->msg_props, !has_body);
+    qd_compose_free(props);
+    qd_compose_free(rmsg->msg_props);
+    rmsg->msg_props = 0;
+
+    // start delivery if possible
+    if (hconn->in_link_credit > 0 && rmsg == DEQ_HEAD(hreq->responses)) {
+        hconn->in_link_credit -= 1;
+
+        qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+               "[C%"PRIu64"][L%"PRIu64"] Delivering response to router addr=%s",
+               hconn->conn_id, hconn->in_link_id, hreq->base.response_addr);
+
+        qd_iterator_t *addr = qd_message_field_iterator(rmsg->msg, QD_FIELD_TO);
+        assert(addr);
+        qd_iterator_reset_view(addr, ITER_VIEW_ADDRESS_HASH);
+        rmsg->dlv = qdr_link_deliver_to(hconn->in_link, rmsg->msg, 0, addr, false, 0, 0, 0, 0);
+        qdr_delivery_set_context(rmsg->dlv, (void*) hreq);
+        qdr_delivery_incref(rmsg->dlv, "referenced by HTTP1 adaptor");
+        rmsg->msg = 0;  // now owned by delivery
+    }
+
+    return 0;
+}
+
+
+// Called with decoded body data.  This may be called multiple times as body
+// data becomes available.
+//
+static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t offset, size_t len,
+                              bool more)
+{
+    _server_request_t       *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+    _server_response_msg_t *rmsg  = DEQ_TAIL(hreq->responses);
+
+    qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv);
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] HTTP response body received len=%zu.",
+           hconn->conn_id, hconn->in_link_id, len);
+
+    if (offset) {
+        // dispatch assumes all body data starts at the buffer base so it cannot deal with offsets.
+        // Remove the offset by shifting the content of the head buffer forward
+        //
+        qd_buffer_t *head = DEQ_HEAD(*body);
+        memmove(qd_buffer_base(head), qd_buffer_base(head) + offset, qd_buffer_size(head) - offset);
+        head->size -= offset;
+    }
+
+    //
+    // Compose a DATA performative for this section of the stream
+    //
+    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+    qd_compose_insert_binary_buffers(field, body);
+
+    //
+    // Extend the streaming message and free the composed field
+    //
+    qd_message_extend(msg, field);
+    qd_compose_free(field);
+
+    //
+    // Notify the router that more data is ready to be pushed out on the delivery
+    //
+    if (!more)
+        qd_message_set_receive_complete(msg);
+
+    if (rmsg->dlv)
+        qdr_delivery_continue(qdr_http1_adaptor->core, rmsg->dlv, false);
+
+    return 0;
+}
+
+// Called at the completion of response decoding.
+//
+static void _server_rx_done_cb(h1_codec_request_state_t *hrs)
+{
+    _server_request_t       *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+    _server_response_msg_t *rmsg  = DEQ_TAIL(hreq->responses);
+
+    qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv);
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] HTTP response receive complete.",
+           hconn->conn_id, hconn->in_link_id);
+
+    rmsg->rx_complete = true;
+
+    if (!qd_message_receive_complete(msg)) {
+        qd_message_set_receive_complete(msg);
+        if (rmsg->dlv) {
+            qdr_delivery_continue(qdr_http1_adaptor->core, rmsg->dlv, false);
+        }
+    }
+
+    if (rmsg->dlv) {
+        // We've finished the delivery, and don't care about outcome/settlement
+        _server_response_msg_free(hreq, rmsg);
+    }
+}
+
+
+// called at the completion of a full Request/Response exchange, or as a result
+// of cancelling the request.  The hrs will be deleted on return from this
+// call.  Any hrs related state must be released before returning from this
+// callback.
+//
+// Note: in the case where the request had multiple response messages, this
+// call occurs when the LAST response has been completely received
+// (_server_rx_done_cb())
+//
+static void _server_request_complete_cb(h1_codec_request_state_t *hrs, bool cancelled)
+{
+    _server_request_t       *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+    hreq->base.lib_rs = 0;
+    hreq->cancelled = hreq->cancelled || cancelled;
+    hreq->codec_completed = !hreq->cancelled;
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"] HTTP request/response %s.", hconn->conn_id,
+           cancelled ? "cancelled!" : "codec done");
+}
+
+
+//////////////////////////////////////////////////////////////////////
+// Router Protocol Adapter Callbacks
+//////////////////////////////////////////////////////////////////////
+
+
+// credit has been granted - responses may now be sent to the
+// router core.
+//
+void qdr_http1_server_core_link_flow(qdr_http1_adaptor_t    *adaptor,
+                                     qdr_http1_connection_t *hconn,
+                                     qdr_link_t             *link,
+                                     int                     credit)
+{
+    assert(link == hconn->in_link);   // router only grants flow on incoming link
+
+    assert(qdr_link_is_anonymous(link));  // remove me
+    hconn->in_link_credit += credit;
+
+    qd_log(adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] Credit granted on response link: %d",
+           hconn->conn_id, hconn->in_link_id, hconn->in_link_credit);
+
+    if (hconn->in_link_credit > 0) {
+
+        if (hconn->raw_conn)
+            qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+
+        // check for pending responses that are blocked for credit
+
+        _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+        if (hreq) {
+            _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+            while (rmsg && rmsg->msg && hconn->in_link_credit > 0) {
+                assert(!rmsg->dlv);
+                hconn->in_link_credit -= 1;
+
+                qd_log(adaptor->log, QD_LOG_TRACE,
+                       "[C%"PRIu64"][L%"PRIu64"] Delivering blocked response to router addr=%s",
+                       hconn->conn_id, hconn->in_link_id, hreq->base.response_addr);
+
+                qd_iterator_t *addr = qd_message_field_iterator(rmsg->msg, QD_FIELD_TO);
+                qd_iterator_reset_view(addr, ITER_VIEW_ADDRESS_HASH);
+                qdr_delivery_t *dlv = qdr_link_deliver_to(hconn->in_link, rmsg->msg, 0, addr, false, 0, 0, 0, 0);
+                if (!rmsg->rx_complete) {
+                    // stop here since response must be complete before we can deliver the next one.
+                    rmsg->dlv = dlv;
+                    qdr_delivery_set_context(rmsg->dlv, (void*) hreq);
+                    qdr_delivery_incref(rmsg->dlv, "referenced by HTTP1 adaptor");
+                    rmsg->msg = 0;
+                    break;
+                }
+
+                // the delivery is complete no need to save it
+                _server_response_msg_free(hreq, rmsg);
+                rmsg = DEQ_HEAD(hreq->responses);
+            }
+        }
+    }
+}
+
+
+// Handle disposition/settlement update for the outstanding HTTP response.
+//
+void qdr_http1_server_core_delivery_update(qdr_http1_adaptor_t      *adaptor,
+                                           qdr_http1_connection_t   *hconn,
+                                           qdr_http1_request_base_t *hbase,
+                                           qdr_delivery_t           *dlv,
+                                           uint64_t                  disp,
+                                           bool                      settled)
+{
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] HTTP response delivery update, outcome=0x%"PRIx64"%s",
+           hconn->conn_id, hconn->in_link_id, disp, settled ? " settled": "");
+
+    // Not much can be done with error dispositions (I think)
+    if (disp != PN_ACCEPTED) {
+        qd_log(adaptor->log, QD_LOG_WARNING,
+               "[C%"PRIu64"][L%"PRIu64"] response message not received, outcome=0x%"PRIx64,
+               hconn->conn_id, hconn->in_link_id, disp);
+    }
+}
+
+
+//
+// Request message forwarding
+//
+
+
+// Create a request context for a new request in msg, which is valid to a depth
+// of at least QD_DEPTH_PROPERTIES
+//
+static _server_request_t *_create_request_context(qdr_http1_connection_t *hconn, qd_message_t *msg)
+{
+    uint64_t msg_id = 0;
+    char *reply_to = 0;
+    bool ok = false;
+    qd_parsed_field_t *msg_id_pf = 0;
+
+    qd_iterator_t *msg_id_itr = qd_message_field_iterator_typed(msg, QD_FIELD_MESSAGE_ID);  // ulong
+    if (msg_id_itr) {
+        msg_id_pf = qd_parse(msg_id_itr);
+        if (msg_id_pf && qd_parse_ok(msg_id_pf)) {
+            msg_id = qd_parse_as_ulong(msg_id_pf);
+            ok = qd_parse_ok(msg_id_pf);
+        }
+    }
+    qd_parse_free(msg_id_pf);
+    qd_iterator_free(msg_id_itr);
+
+    if (!ok) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+               "[C%"PRIu64"][L%"PRIu64"] Rejecting message missing id.",
+               hconn->conn_id, hconn->out_link_id);
+        return 0;
+    }
+
+    qd_iterator_t *reply_to_itr = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO);
+    reply_to = (char*) qd_iterator_copy(reply_to_itr);
+    qd_iterator_free(reply_to_itr);
+
+    assert(reply_to && strlen(reply_to));  // remove me
+    if (!reply_to) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+               "[C%"PRIu64"][L%"PRIu64"] Rejecting message no reply-to.",
+               hconn->conn_id, hconn->out_link_id);
+        return 0;
+    }
+
+    _server_request_t *hreq = new__server_request_t();
+    ZERO(hreq);
+    hreq->base.hconn = hconn;
+    hreq->base.msg_id = msg_id;
+    hreq->base.response_addr = reply_to;
+    DEQ_INIT(hreq->out_data.fifo);
+    DEQ_INIT(hreq->responses);
+    DEQ_INSERT_TAIL(hconn->requests, &hreq->base);
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] New HTTP Request msg_id=%"PRIu64" reply-to=%s.",
+           hconn->conn_id, hconn->out_link_id, msg_id, reply_to);
+    return hreq;
+}
+
+
+// Start a new request to the server.  msg has been validated to at least
+// application properties depth.  Returns 0 on success.
+//
+static uint64_t _send_request_headers(_server_request_t *hreq, qd_message_t *msg)
+{
+    // start encoding HTTP request.  Need method, target and version
+
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+    char *method_str = 0;
+    char *target_str = 0;
+    qd_parsed_field_t *app_props = 0;
+    uint32_t major = 1;
+    uint32_t minor = 1;
+    uint64_t outcome = 0;
+
+    assert(!hreq->base.lib_rs);
+
+    // method is passed in the SUBJECT field
+    qd_iterator_t *method_iter = qd_message_field_iterator(msg, QD_FIELD_SUBJECT);
+    if (!method_iter) {
+        return PN_REJECTED;
+    }
+
+    method_str = (char*) qd_iterator_copy(method_iter);
+    qd_iterator_free(method_iter);
+    if (!method_str) {
+        return PN_REJECTED;
+    }
+
+    // target, version info and other headers are in the app properties
+    qd_iterator_t *app_props_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
+    if (!app_props_iter) {
+        outcome = PN_REJECTED;
+        goto exit;
+    }
+
+    app_props = qd_parse(app_props_iter);
+    qd_iterator_free(app_props_iter);
+    if (!app_props) {
+        outcome = PN_REJECTED;
+        goto exit;
+    }
+
+    qd_parsed_field_t *ref = qd_parse_value_by_key(app_props, TARGET_HEADER_KEY);
+    target_str = (char*) qd_iterator_copy(qd_parse_raw(ref));
+    if (!target_str) {
+        outcome = PN_REJECTED;
+        goto exit;
+    }
+
+
+    // Pull the version info from the app properties (e.g. "1.1")
+    ref = qd_parse_value_by_key(app_props, REQUEST_HEADER_KEY);
+    if (ref) {  // optional
+        char *version_str = (char*) qd_iterator_copy(qd_parse_raw(ref));
+        if (version_str)
+            sscanf(version_str, "%"SCNu32".%"SCNu32, &major, &minor);
+        free(version_str);
+    }
+
+    // done copying and converting!
+
+    qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] Encoding request method=%s target=%s",
+           hconn->conn_id, hconn->out_link_id, method_str, target_str);
+
+    hreq->base.lib_rs = h1_codec_tx_request(hconn->http_conn, method_str, target_str, major, minor);
+    if (!hreq->base.lib_rs) {
+        outcome = PN_REJECTED;
+        goto exit;
+    }
+
+    h1_codec_request_state_set_context(hreq->base.lib_rs, (void*) hreq);
+
+    // now send all headers in app properties
+    qd_parsed_field_t *key = qd_field_first_child(app_props);
+    bool ok = true;
+    while (ok && key) {
+        qd_parsed_field_t *value = qd_field_next_child(key);
+        if (!value)
+            break;
+
+        qd_iterator_t *i_key = qd_parse_raw(key);
+        if (!i_key)
+            break;
+
+        // ignore the special headers added by the mapping
+        if (!qd_iterator_prefix(i_key, HTTP1_HEADER_PREFIX)) {
+            qd_iterator_t *i_value = qd_parse_raw(value);
+            if (!i_value)
+                break;
+
+            char *header_key = (char*) qd_iterator_copy(i_key);
+            char *header_value = (char*) qd_iterator_copy(i_value);
+
+            qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+                   "[C%"PRIu64"][L%"PRIu64"] Encoding request header %s:%s",
+                   hconn->conn_id, hconn->out_link_id,
+                   header_key, header_value);
+
+            ok = !h1_codec_tx_add_header(hreq->base.lib_rs, header_key, header_value);
+
+            free(header_key);
+            free(header_value);
+        }
+
+        key = qd_field_next_child(value);
+    }
+
+    if (!ok)
+        outcome = PN_REJECTED;
+
+exit:
+
+    free(method_str);
+    free(target_str);
+    qd_parse_free(app_props);
+
+    return outcome;
+}
+
+
+// Encode an outbound AMQP message as an HTTP Request.  Returns PN_ACCEPTED
+// when complete, 0 if incomplete and PN_REJECTED if encoding error.
+//
+static uint64_t _encode_request_message(_server_request_t *hreq)
+{
+    qdr_http1_connection_t    *hconn = hreq->base.hconn;
+    qd_message_t                *msg = qdr_delivery_message(hreq->request_dlv);
+    qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_BODY);
+
+    if (status == QD_MESSAGE_DEPTH_INCOMPLETE)
+        return 0;
+
+    if (status == QD_MESSAGE_DEPTH_INVALID) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+               "[C%"PRIu64"][L%"PRIu64"] body data depth check failed",
+               hconn->conn_id, hconn->out_link_id);
+        return PN_REJECTED;
+    }
+
+    assert(status == QD_MESSAGE_DEPTH_OK);
+
+    if (!hreq->headers_encoded) {
+        uint64_t outcome = _send_request_headers(hreq, msg);
+        if (outcome) {
+            qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                   "[C%"PRIu64"][L%"PRIu64"] Rejecting malformed message.", hconn->conn_id, hconn->out_link_id);
+            return outcome;
+        }
+        hreq->headers_encoded = true;
+    }
+
+    qd_message_body_data_t *body_data = 0;
+
+    while (true) {
+        switch (qd_message_next_body_data(msg, &body_data)) {
+        case QD_MESSAGE_BODY_DATA_OK: {
+
+            qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+                   "[C%"PRIu64"][L%"PRIu64"] Encoding request body data",
+                   hconn->conn_id, hconn->out_link_id);
+
+            if (h1_codec_tx_body(hreq->base.lib_rs, body_data)) {
+                qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                       "[C%"PRIu64"][L%"PRIu64"] body data encode failed",
+                       hconn->conn_id, hconn->out_link_id);
+                return PN_REJECTED;
+            }
+            break;
+        }
+
+        case QD_MESSAGE_BODY_DATA_NO_MORE:
+            // indicate this message is complete
+            qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+                   "[C%"PRIu64"][L%"PRIu64"] request message encoding completed",
+                   hconn->conn_id, hconn->out_link_id);
+            return PN_ACCEPTED;
+
+        case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+            qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+                   "[C%"PRIu64"][L%"PRIu64"] body data need more",
+                   hconn->conn_id, hconn->out_link_id);
+            return 0;  // wait for more
+
+        case QD_MESSAGE_BODY_DATA_INVALID:
+        case QD_MESSAGE_BODY_DATA_NOT_DATA:
+            qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                   "[C%"PRIu64"][L%"PRIu64"] Rejecting corrupted body data.",
+                   hconn->conn_id, hconn->out_link_id);
+            return PN_REJECTED;
+        }
+    }
+}
+
+
+// The router wants to send this delivery out the link. This is either the
+// start of a new incoming HTTP request or the continuation of an existing one.
+// Note: returning a non-zero value will cause the delivery to be settled!
+//
+uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
+                                            qdr_http1_connection_t *hconn,
+                                            qdr_link_t             *link,
+                                            qdr_delivery_t         *delivery,
+                                            bool                    settled)
+{
+    qd_message_t         *msg = qdr_delivery_message(delivery);
+    _server_request_t   *hreq = (_server_request_t*) qdr_delivery_get_context(delivery);
+
+    if (!hreq) {
+        // new delivery - create new request:
+        switch (qd_message_check_depth(msg, QD_DEPTH_PROPERTIES)) {
+        case QD_MESSAGE_DEPTH_INCOMPLETE:
+            return 0;
+
+        case QD_MESSAGE_DEPTH_INVALID:
+            qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                   "[C%"PRIu64"][L%"PRIu64"] Malformed HTTP/1.x message",
+                   hconn->conn_id, link->identity);
+            qd_message_set_send_complete(msg);
+            qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
+            return PN_REJECTED;
+
+        case QD_MESSAGE_DEPTH_OK:
+            hreq = _create_request_context(hconn, msg);
+            if (!hreq) {
+                qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                       "[C%"PRIu64"][L%"PRIu64"] Discarding malformed message.", hconn->conn_id, link->identity);
+                qd_message_set_send_complete(msg);
+                qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
+                return PN_REJECTED;
+            }
+
+            hreq->request_dlv = delivery;
+            qdr_delivery_set_context(delivery, (void*) hreq);
+            qdr_delivery_incref(delivery, "referenced by HTTP1 adaptor");
+            break;
+        }
+    }
+
+    if (!hreq->request_dispo)
+        hreq->request_dispo = _encode_request_message(hreq);
+
+    if (hreq->request_dispo && qd_message_receive_complete(msg)) {
+
+        qd_message_set_send_complete(msg);
+        qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
+
+        if (hreq->request_dispo == PN_ACCEPTED) {
+            hreq->request_encoded = true;
+            h1_codec_tx_done(hreq->base.lib_rs, &hreq->close_on_complete);
+
+        } else {
+            // mapping to HTTP request failed:
+            _cancel_request(hreq);
+        }
+    }
+
+    return 0;
+}
+
+
+//
+// Misc
+//
+
+// free the response message
+//
+static void _server_response_msg_free(_server_request_t *hreq, _server_response_msg_t *rmsg)
+{
+    DEQ_REMOVE(hreq->responses, rmsg);
+    qd_message_free(rmsg->msg);
+    qd_compose_free(rmsg->msg_props);
+    if (rmsg->dlv) {
+        qdr_delivery_set_context(rmsg->dlv, 0);
+        qdr_delivery_decref(qdr_http1_adaptor->core, rmsg->dlv, "HTTP1 adaptor response freed");
+    }
+    free__server_response_msg_t(rmsg);
+}
+
+
+// Release the request
+//
+static void _server_request_free(_server_request_t *hreq)
+{
+    if (hreq) {
+        qdr_http1_request_base_cleanup(&hreq->base);
+        if (hreq->request_dlv) {
+            qdr_delivery_set_context(hreq->request_dlv, 0);
+            qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request freed");
+        }
+
+        qdr_http1_out_data_fifo_cleanup(&hreq->out_data);
+
+        _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+        while (rmsg) {
+            _server_response_msg_free(hreq, rmsg);
+            rmsg = DEQ_HEAD(hreq->responses);
+        }
+
+        free__server_request_t(hreq);
+    }
+}
+
+
+static void _write_pending_request(_server_request_t *hreq)
+{
+    if (hreq && !hreq->cancelled && !hreq->base.hconn->close_connection) {
+        assert(DEQ_PREV(&hreq->base) == 0);  // preserve order!
+        uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &hreq->out_data);
+        hreq->base.out_http1_octets += written;
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%i] %"PRIu64" octets written",
+               hreq->base.hconn->conn_id, written);
+    }
+}
+
+
+static void _server_connection_free(qdr_http1_connection_t *hconn)
+{
+    for (_server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+         hreq;
+         hreq = (_server_request_t*) DEQ_HEAD(hconn->requests)) {
+        _server_request_free(hreq);
+    }
+    qdr_http1_connection_free(hconn);
+}
+
+
+static void _cancel_request(_server_request_t *hreq)
+{
+    if (!hreq->base.lib_rs) {
+        // never even got to encoding it - manually mark it cancelled
+        hreq->cancelled = true;
+    } else {
+        // cleanup codec state - this will call _server_request_complete_cb()
+        // with cancelled = true
+        h1_codec_request_state_cancel(hreq->base.lib_rs);
+    }
+
+    // cleanup occurs at the end of the connection event handler
+}
diff --git a/src/adaptors/http_common.c b/src/adaptors/http_common.c
index c650875..6eccdad 100644
--- a/src/adaptors/http_common.c
+++ b/src/adaptors/http_common.c
@@ -43,10 +43,10 @@ static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_http_bridge_config_t
     config->address = qd_entity_get_string(entity, "address");         CHECK();
     version_str     = qd_entity_get_string(entity, "protocolVersion");  CHECK();
 
-    if (strncmp(version_str, "HTTP/1", 6) == 0) {
-        config->version = VERSION_HTTP1;
-    } else {
+    if (strcmp(version_str, "HTTP2") == 0) {
         config->version = VERSION_HTTP2;
+    } else {
+        config->version = VERSION_HTTP1;
     }
     free(version_str);
     version_str = 0;
diff --git a/src/adaptors/http_common.h b/src/adaptors/http_common.h
index e8a7ff9..2339146 100644
--- a/src/adaptors/http_common.h
+++ b/src/adaptors/http_common.h
@@ -66,7 +66,8 @@ struct qd_http_connector_t {
     qd_server_t                  *server;
     qd_timer_t                   *timer;
     long                          delay;
-    struct qdr_http_connection_t *dispatcher;  // pseudo egress connection
+    struct qdr_http_connection_t *dispatcher;
+
     DEQ_LINKS(qd_http_connector_t);
 };
 DEQ_DECLARE(qd_http_connector_t, qd_http_connector_list_t);
diff --git a/src/message.c b/src/message.c
index 286f523..c5d8725 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2444,7 +2444,7 @@ int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffe
     //
     int idx = 0;
     while (idx < count && !!buffer) {
-        buffers[idx].context  = 0;
+        buffers[idx].context  = 0;  // reserved for use by caller - do not modify!
         buffers[idx].bytes    = (char*) qd_buffer_base(buffer) + (buffer == body_data->payload.buffer ? body_data->payload.offset : 0);
         buffers[idx].capacity = BUFFER_SIZE;
         buffers[idx].size     = qd_buffer_size(buffer) - (buffer == body_data->payload.buffer ? body_data->payload.offset : 0);
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 828e464..c10ccb4 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -150,6 +150,7 @@ foreach(py_test_module
     system_tests_routing_protocol
     system_tests_open_properties
     system_tests_http2
+    system_tests_http1_adaptor
     )
 
   add_test(${py_test_module} ${TEST_WRAP} ${PYTHON_TEST_COMMAND} -v ${py_test_module})
diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py
new file mode 100644
index 0000000..0e6eb81
--- /dev/null
+++ b/tests/system_tests_http1_adaptor.py
@@ -0,0 +1,722 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Test the HTTP/1.x Adaptor
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+import sys
+from threading import Thread
+try:
+    from http.server import HTTPServer, BaseHTTPRequestHandler
+    from http.client import HTTPConnection
+    from http.client import HTTPException
+except ImportError:
+    from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
+    from httplib import HTTPConnection, HTTPException
+
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from system_test import TestCase, unittest, main_module, Qdrouterd
+from system_test import TIMEOUT, Logger
+
+
+class RequestMsg(object):
+    """
+    A 'hardcoded' HTTP request message.  This class writes its request
+    message to the HTTPConnection.
+    """
+    def __init__(self, method, target, headers=None, body=None):
+        self.method = method
+        self.target = target
+        self.headers = headers or {}
+        self.body = body
+
+    def send_request(self, conn):
+        conn.putrequest(self.method, self.target)
+        for key, value in self.headers.items():
+            conn.putheader(key, value)
+        conn.endheaders()
+        if self.body:
+            conn.send(self.body)
+
+
+class ResponseMsg(object):
+    """
+    A 'hardcoded' HTTP response message.  This class writes its response
+    message when called by the HTTPServer via the BaseHTTPRequestHandler
+    """
+    def __init__(self, status, version=None, reason=None,
+                 headers=None, body=None, eom_close=False, error=False):
+        self.status = status
+        self.version = version or "HTTP/1.1"
+        self.reason = reason
+        self.headers = headers or []
+        self.body = body
+        self.eom_close = eom_close
+        self.error = error
+
+    def send_response(self, handler):
+        handler.protocol_version = self.version
+        if self.error:
+            handler.send_error(self.status,
+                               message=self.reason,
+                               explain=self.body)
+            return
+
+        handler.send_response(self.status, self.reason)
+        for key, value in self.headers.items():
+            handler.send_header(key, value)
+        handler.end_headers()
+
+        if self.body:
+            handler.wfile.write(self.body)
+            handler.wfile.flush()
+
+        return self.eom_close
+
+
+class ResponseValidator(object):
+    """
+    Validate a response as received by the HTTP client
+    """
+    def __init__(self, status=200, expect_headers=None, expect_body=None):
+        if expect_headers is None:
+            expect_headers = {}
+        self.status = status
+        self.expect_headers = expect_headers
+        self.expect_body = expect_body
+
+    def check_response(self, rsp):
+        if self.status and rsp.status != self.status:
+            raise Exception("Bad response code, expected %s got %s"
+                            % (self.status, rsp.status))
+        for key, value in self.expect_headers.items():
+            if rsp.getheader(key) != value:
+                raise Exception("Missing/bad header (%s), expected %s got %s"
+                                % (key, value, rsp.getheader(key)))
+
+        body = rsp.read()
+        if (self.expect_body and self.expect_body != body):
+            raise Exception("Bad response body expected %s got %s"
+                            % (self.expect_body, body))
+        return body
+
+
+DEFAULT_TEST_SCENARIOS = {
+
+    #
+    # GET
+    #
+
+    "GET": [
+        (RequestMsg("GET", "/GET/error",
+                    headers={"Content-Length": 0}),
+         ResponseMsg(400, reason="Bad breath", error=True),
+         ResponseValidator(status=400)),
+
+        (RequestMsg("GET", "/GET/content_len",
+                    headers={"Content-Length": "00"}),
+         ResponseMsg(200, reason="OK",
+                     headers={"Content-Length": 1,
+                              "Content-Type": "text/plain;charset=utf-8"},
+                     body=b'?'),
+         ResponseValidator(expect_headers={'Content-Length': '1'},
+                           expect_body=b'?')),
+
+        (RequestMsg("GET", "/GET/content_len_511",
+                    headers={"Content-Length": 0}),
+         ResponseMsg(200, reason="OK",
+                     headers={"Content-Length": 511,
+                              "Content-Type": "text/plain;charset=utf-8"},
+                     body=b'X' * 511),
+         ResponseValidator(expect_headers={'Content-Length': '511'},
+                           expect_body=b'X' * 511)),
+
+        (RequestMsg("GET", "/GET/content_len_4096",
+                    headers={"Content-Length": 0}),
+         ResponseMsg(200, reason="OK",
+                     headers={"Content-Length": 4096,
+                              "Content-Type": "text/plain;charset=utf-8"},
+                     body=b'X' * 4096),
+         ResponseValidator(expect_headers={'Content-Length': '4096'},
+                           expect_body=b'X' * 4096)),
+
+        (RequestMsg("GET", "/GET/chunked",
+                    headers={"Content-Length": 0}),
+         ResponseMsg(200, reason="OK",
+                     headers={"transfer-encoding": "chunked",
+                              "Content-Type": "text/plain;charset=utf-8"},
+                     # note: the chunk length does not count the trailing CRLF
+                     body=b'16\r\n'
+                     + b'Mary had a little pug \r\n'
+                     + b'1b\r\n'
+                     + b'Its name was "Skupper-Jack"\r\n'
+                     + b'0\r\n'
+                     + b'Optional: Trailer\r\n'
+                     + b'Optional: Trailer\r\n'
+                     + b'\r\n'),
+         ResponseValidator(expect_headers={'transfer-encoding': 'chunked'},
+                           expect_body=b'Mary had a little pug Its name was "Skupper-Jack"')),
+
+        (RequestMsg("GET", "/GET/chunked_large",
+                    headers={"Content-Length": 0}),
+         ResponseMsg(200, reason="OK",
+                     headers={"transfer-encoding": "chunked",
+                              "Content-Type": "text/plain;charset=utf-8"},
+                     # note: the chunk length does not count the trailing CRLF
+                     body=b'1\r\n'
+                     + b'?\r\n'
+                     + b'800\r\n'
+                     + b'X' * 0x800 + b'\r\n'
+                     + b'13\r\n'
+                     + b'Y' * 0x13  + b'\r\n'
+                     + b'0\r\n'
+                     + b'Optional: Trailer\r\n'
+                     + b'Optional: Trailer\r\n'
+                     + b'\r\n'),
+         ResponseValidator(expect_headers={'transfer-encoding': 'chunked'},
+                           expect_body=b'?' + b'X' * 0x800 + b'Y' * 0x13)),
+
+        (RequestMsg("GET", "/GET/info_content_len",
+                    headers={"Content-Length": 0}),
+         [ResponseMsg(100, reason="Continue",
+                      headers={"Blab": 1, "Blob": "?"}),
+          ResponseMsg(200, reason="OK",
+                      headers={"Content-Length": 1,
+                               "Content-Type": "text/plain;charset=utf-8"},
+                      body=b'?')],
+         ResponseValidator(expect_headers={'Content-Type': "text/plain;charset=utf-8"},
+                           expect_body=b'?')),
+
+        (RequestMsg("GET", "/GET/no_length",
+                      headers={"Content-Length": "0"}),
+         ResponseMsg(200, reason="OK",
+                     headers={"Content-Type": "text/plain;charset=utf-8",
+                              #         ("connection", "close")
+                     },
+                     body=b'Hi! ' * 1024 + b'X',
+                     eom_close=True),
+         ResponseValidator(expect_body=b'Hi! ' * 1024 + b'X')),
+    ],
+
+    #
+    # HEAD
+    #
+
+    "HEAD": [
+        (RequestMsg("HEAD", "/HEAD/test_01",
+                    headers={"Content-Length": "0"}),
+         ResponseMsg(200, headers={"App-Header-1": "Value 01",
+                                   "Content-Length": "10",
+                                   "App-Header-2": "Value 02"},
+                     body=None),
+         ResponseValidator(expect_headers={"App-Header-1": "Value 01",
+                                           "Content-Length": "10",
+                                           "App-Header-2": "Value 02"})
+        ),
+        (RequestMsg("HEAD", "/HEAD/test_02",
+                      headers={"Content-Length": "0"}),
+         ResponseMsg(200, headers={"App-Header-1": "Value 01",
+                                   "Transfer-Encoding": "chunked",
+                                   "App-Header-2": "Value 02"}),
+         ResponseValidator(expect_headers={"App-Header-1": "Value 01",
+                                        "Transfer-Encoding": "chunked",
+                                        "App-Header-2": "Value 02"})),
+
+        (RequestMsg("HEAD", "/HEAD/test_03",
+                    headers={"Content-Length": "0"}),
+         ResponseMsg(200, headers={"App-Header-3": "Value 03"}, eom_close=True),
+         ResponseValidator(expect_headers={"App-Header-3": "Value 03"})),
+    ],
+
+    #
+    # POST
+    #
+
+    "POST": [
+        (RequestMsg("POST", "/POST/test_01",
+                    headers={"App-Header-1": "Value 01",
+                             "Content-Length": "18",
+                             "Content-Type": "application/x-www-form-urlencoded"},
+                    body=b'one=1&two=2&three=3'),
+         ResponseMsg(200, reason="OK",
+                     headers={"Response-Header": "whatever",
+                              "Transfer-Encoding": "chunked"},
+                     body=b'8\r\n'
+                     + b'12345678\r\n'
+                     + b'f\r\n'
+                     + b'abcdefghijklmno\r\n'
+                     + b'000\r\n'
+                     + b'\r\n'),
+         ResponseValidator(expect_body=b'12345678abcdefghijklmno')
+        ),
+        (RequestMsg("POST", "/POST/test_02",
+                    headers={"App-Header-1": "Value 01",
+                              "Transfer-Encoding": "chunked"},
+                    body=b'01\r\n'
+                    + b'!\r\n'
+                    + b'0\r\n\r\n'),
+         ResponseMsg(200, reason="OK",
+                     headers={"Response-Header": "whatever",
+                             "Content-Length": "9"},
+                     body=b'Hi There!',
+                     eom_close=True),
+         ResponseValidator(expect_body=b'Hi There!')
+        ),
+    ],
+
+    #
+    # PUT
+    #
+
+    "PUT": [
+        (RequestMsg("PUT", "/PUT/test_01",
+                    headers={"Put-Header-1": "Value 01",
+                             "Transfer-Encoding": "chunked",
+                             "Content-Type": "text/plain;charset=utf-8"},
+                    body=b'80\r\n'
+                    + b'$' * 0x80 + b'\r\n'
+                    + b'0\r\n\r\n'),
+         ResponseMsg(201, reason="Created",
+                     headers={"Response-Header": "whatever",
+                              "Content-length": "3"},
+                     body=b'ABC'),
+         ResponseValidator(status=201, expect_body=b'ABC')
+        ),
+
+        (RequestMsg("PUT", "/PUT/test_02",
+                    headers={"Put-Header-1": "Value 01",
+                             "Content-length": "0",
+                             "Content-Type": "text/plain;charset=utf-8"}),
+         ResponseMsg(201, reason="Created",
+                     headers={"Response-Header": "whatever",
+                              "Transfer-Encoding": "chunked"},
+                     body=b'1\r\n$\r\n0\r\n\r\n',
+                     eom_close=True),
+         ResponseValidator(status=201, expect_body=b'$')
+        ),
+    ]
+}
+
+
+class RequestHandler(BaseHTTPRequestHandler):
+    """
+    Dispatches requests received by the HTTPServer based on the method
+    """
+    protocol_version = 'HTTP/1.1'
+    def _execute_request(self, tests):
+        for req, resp, val in tests:
+            if req.target == self.path:
+                self._consume_body()
+                if not isinstance(resp, list):
+                    resp = [resp]
+                for r in resp:
+                    r.send_response(self)
+                    if r.eom_close:
+                        self.close_connection = 1
+                        self.server.system_test_server_done = True
+                return
+        self.send_error(404, "Not Found")
+
+    def do_GET(self):
+        self._execute_request(self.server.system_tests["GET"])
+
+    def do_HEAD(self):
+        self._execute_request(self.server.system_tests["HEAD"])
+
+    def do_POST(self):
+        if self.path == "/SHUTDOWN":
+            self.close_connection = True
+            self.server.system_test_server_done = True
+            self.send_response(200, "OK")
+            self.send_header("Content-Length", "13")
+            self.end_headers()
+            self.wfile.write(b'Server Closed')
+            self.wfile.flush()
+            return
+        self._execute_request(self.server.system_tests["POST"])
+
+    def do_PUT(self):
+        self._execute_request(self.server.system_tests["PUT"])
+
+    # these overrides just quiet the test output
+    # comment them out to help debug:
+    def log_request(self, code=None, size=None):
+        pass
+
+    def log_error(self, format=None, *args):
+        pass
+
+    def log_message(self, format=None, *args):
+        pass
+
+    def _consume_body(self):
+        """
+        Read the entire body off the rfile.  This must be done to allow
+        multiple requests on the same socket
+        """
+        if self.command == 'HEAD':
+            return b''
+
+        for key, value in self.headers.items():
+            if key.lower() == 'content-length':
+                return self.rfile.read(int(value))
+
+            if key.lower() == 'transfer-encoding'  \
+               and 'chunked' in value.lower():
+                body = b''
+                while True:
+                    header = self.rfile.readline().strip().split(b';')[0]
+                    data = self.rfile.readline().rstrip()
+                    body += data
+                    if int(header) == 0:
+                        break;
+                return body
+        return self.rfile.read()
+
+
+class MyHTTPServer(HTTPServer):
+    """
+    Adds a switch to the HTTPServer to allow it to exit gracefully
+    """
+    def __init__(self, addr, handler_cls, testcases=None):
+        if testcases is None:
+            testcases = DEFAULT_TEST_SCENARIOS
+        self.system_test_server_done = False
+        self.system_tests = testcases
+        super(MyHTTPServer, self).__init__(addr, handler_cls)
+
+
+class TestServer(object):
+    """
+    A HTTPServer running in a separate thread
+    """
+    def __init__(self, port=8080, tests=None):
+        self._logger = Logger(title="TestServer", print_to_console=False)
+        self._server_addr = ("", port)
+        self._server = MyHTTPServer(self._server_addr, RequestHandler, tests)
+        self._server.allow_reuse_address = True
+        self._thread = Thread(target=self._run)
+        self._thread.daemon = True
+        self._thread.start()
+
+    def _run(self):
+        self._logger.log("TestServer listening on %s:%s" % self._server_addr)
+        try:
+            while not self._server.system_test_server_done:
+                self._server.handle_request()
+        except Exception as exc:
+            self._logger.log("TestServer %s:%s crash: %s" %
+                             (self._server_addr, exc))
+            raise
+        self._logger.log("TestServer %s:%s closed" % self._server_addr)
+
+    def wait(self, timeout=TIMEOUT):
+        self._logger.log("TestServer %s:%s shutting down" % self._server_addr)
+        self._thread.join(timeout=TIMEOUT)
+        if self._server:
+            self._server.server_close()
+
+
+class ThreadedTestClient(object):
+    """
+    An HTTP client running in a separate thread
+    """
+    def __init__(self, tests, port, repeat=1):
+        self._conn_addr = ("127.0.0.1:%s" % port)
+        self._tests = tests
+        self._repeat = repeat
+        self._logger = Logger(title="TestClient", print_to_console=False)
+        self._thread = Thread(target=self._run)
+        self._thread.daemon = True
+        self.error = None
+        self._thread.start()
+
+    def _run(self):
+        self._logger.log("TestClient connecting on %s" % self._conn_addr)
+        client = HTTPConnection(self._conn_addr, timeout=TIMEOUT)
+        for loop in range(self._repeat):
+            for op, tests in self._tests.items():
+                for req, _, val in tests:
+                    self._logger.log("TestClient sending request")
+                    req.send_request(client)
+                    self._logger.log("TestClient getting response")
+                    rsp = client.getresponse()
+                    self._logger.log("TestClient response received")
+                    if val:
+                        try:
+                            body = val.check_response(rsp)
+                        except Exception as exc:
+                            self._logger.log("TestClient response invalid: %s",
+                                             str(exc))
+                            self.error = "client failed: %s" % str(exc)
+                            return
+
+                        if req.method is "BODY" and body != b'':
+                            self._logger.log("TestClient response invalid: %s",
+                                             "body present!")
+                            self.error = "error: body present!"
+                            return
+
+        client.close()
+        self._logger.log("TestClient to %s closed" % self._conn_addr)
+
+    def wait(self, timeout=TIMEOUT):
+        self._thread.join(timeout=TIMEOUT)
+        self._logger.log("TestClient %s shut down" % self._conn_addr)
+
+
+class Http1AdaptorOneRouterTest(TestCase):
+    """
+    Test an HTTP server and client attached to a standalone router
+    """
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(Http1AdaptorOneRouterTest, cls).setUpClass()
+
+        def router(name, mode, extra):
+            config = [
+                ('router', {'mode': mode,
+                            'id': name,
+                            'allowUnsettledMulticast': 'yes'}),
+                ('listener', {'role': 'normal',
+                              'port': cls.tester.get_port()}),
+                ('address', {'prefix': 'closest',   'distribution': 'closest'}),
+                ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+            ]
+
+            if extra:
+                config.extend(extra)
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+            return cls.routers[-1]
+
+        # configuration:
+        # interior
+        #
+        #  +----------------+
+        #  |     INT.A      |
+        #  +----------------+
+        #      ^         ^
+        #      |         |
+        #      V         V
+        #  <client>  <server>
+
+        cls.routers = []
+        cls.http_server_port = cls.tester.get_port()
+        cls.http_listener_port = cls.tester.get_port()
+
+        router('INT.A', 'standalone',
+               [('httpConnector', {'port': cls.http_server_port,
+                                   'protocolVersion': 'HTTP1',
+                                   'address': 'testServer'}),
+                ('httpListener', {'port': cls.http_listener_port,
+                                  'protocolVersion': 'HTTP1',
+                                  'address': 'testServer'})
+               ])
+        cls.INT_A = cls.routers[0]
+        cls.INT_A.listener = cls.INT_A.addresses[0]
+
+    def _do_request(self, tests):
+        server = TestServer(port=self.http_server_port)
+        for req, _, val in tests:
+            client = HTTPConnection("127.0.0.1:%s" % self.http_listener_port,
+                                    timeout=TIMEOUT)
+            req.send_request(client)
+            rsp = client.getresponse()
+            try:
+                body = val.check_response(rsp)
+            except Exception as exc:
+                self.fail("request failed:  %s" % str(exc))
+
+            if req.method is "BODY":
+                self.assertEqual(b'', body)
+
+            client.close()
+        server.wait()
+
+    def test_001_get(self):
+        self._do_request(DEFAULT_TEST_SCENARIOS["GET"])
+
+    def test_002_head(self):
+        self._do_request(DEFAULT_TEST_SCENARIOS["HEAD"])
+
+    def test_003_post(self):
+        self._do_request(DEFAULT_TEST_SCENARIOS["POST"])
+
+    def test_004_put(self):
+        self._do_request(DEFAULT_TEST_SCENARIOS["PUT"])
+
+
+class Http1AdaptorInteriorTest(TestCase):
+    """
+    Test an HTTP server connected to an interior router serving multiple HTTP
+    clients
+    """
+    TESTS = {
+        "PUT": [
+            (RequestMsg("PUT", "/PUT/test",
+                        headers={"Header-1": "Value",
+                                 "Header-2": "Value",
+                                 "Content-Length": "20",
+                                 "Content-Type": "text/plain;charset=utf-8"},
+                        body=b'!' * 20),
+             ResponseMsg(201, reason="Created",
+                         headers={"Response-Header": "data",
+                                  "Content-Length": "0"}),
+             ResponseValidator(status=201)
+            )],
+
+        "POST": [
+            (RequestMsg("POST", "/POST/test",
+                        headers={"Header-1": "X",
+                                 "Content-Length": "11",
+                                 "Content-Type": "application/x-www-form-urlencoded"},
+                        body=b'one=1' + b'&two=2'),
+             ResponseMsg(200, reason="OK",
+                         headers={"Response-Header": "whatever",
+                                  "Content-Length": 10},
+                         body=b'0123456789'),
+             ResponseValidator()
+            )],
+
+        "GET": [
+            (RequestMsg("GET", "/GET/test",
+                        headers={"Content-Length": "000"}),
+             ResponseMsg(200, reason="OK",
+                         headers={"Content-Length": "655",
+                                  "Content-Type": "text/plain;charset=utf-8"},
+                         body=b'?' * 655),
+             ResponseValidator(expect_headers={'Content-Length': '655'},
+                               expect_body=b'?' * 655)
+            )],
+
+        "PUT": [
+            (RequestMsg("PUT", "/PUT/chunked",
+                        headers={"Transfer-Encoding": "chunked",
+                                 "Content-Type": "text/plain;charset=utf-8"},
+                        body=b'16\r\n' + b'!' * 0x16 + b'\r\n'
+                        + b'0\r\n\r\n'),
+             ResponseMsg(204, reason="No Content",
+                        headers={"Content-Length": "000"}),
+             ResponseValidator(status=204)
+            )],
+    }
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(Http1AdaptorInteriorTest, cls).setUpClass()
+
+        def router(name, mode, extra):
+            config = [
+                ('router', {'mode': mode,
+                            'id': name,
+                            'allowUnsettledMulticast': 'yes'}),
+                ('listener', {'role': 'normal',
+                              'port': cls.tester.get_port()}),
+                ('address', {'prefix': 'closest',   'distribution': 'closest'}),
+                ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+            ]
+
+            if extra:
+                config.extend(extra)
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+            return cls.routers[-1]
+
+        # configuration:
+        # one edge, one interior
+        #
+        #  +-------+    +---------+
+        #  |  EA1  |<==>|  INT.A  |
+        #  +-------+    +---------+
+        #      ^             ^
+        #      |             |
+        #      V             V
+        #  <clients>      <server>
+
+        cls.routers = []
+        cls.INTA_edge_port   = cls.tester.get_port()
+        cls.http_server_port = cls.tester.get_port()
+        cls.http_listener_port = cls.tester.get_port()
+
+        router('INT.A', 'interior',
+               [('listener', {'role': 'edge', 'port': cls.INTA_edge_port}),
+                ('httpConnector', {'port': cls.http_server_port,
+                                   'protocolVersion': 'HTTP1',
+                                   'address': 'testServer'})
+               ])
+        cls.INT_A = cls.routers[0]
+        cls.INT_A.listener = cls.INT_A.addresses[0]
+
+        router('EA1', 'edge',
+               [('connector', {'name': 'uplink', 'role': 'edge',
+                               'port': cls.INTA_edge_port}),
+                ('httpListener', {'port': cls.http_listener_port,
+                                  'protocolVersion': 'HTTP1',
+                                  'address': 'testServer'})
+               ])
+        cls.EA1 = cls.routers[1]
+        cls.EA1.listener = cls.EA1.addresses[0]
+
+        cls.EA1.wait_connectors()
+        cls.INT_A.wait_address('EA1')
+
+
+    def test_01_load(self):
+        """
+        Test multiple clients running as fast as possible
+        """
+        server = TestServer(port=self.http_server_port, tests=self.TESTS)
+
+        clients = []
+        for _ in range(5):
+            clients.append(ThreadedTestClient(self.TESTS,
+                                              self.http_listener_port,
+                                              repeat=2))
+        for client in clients:
+            client.wait()
+            self.assertIsNone(client.error)
+
+        # terminate the server thread by sending a request
+        # with eom_close set
+
+        client = ThreadedTestClient({"POST": [(RequestMsg("POST",
+                                                          "/SHUTDOWN",
+                                                          {"Content-Length": "0"}),
+                                                   None,
+                                                   None)]},
+                                    self.http_listener_port)
+        client.wait()
+        self.assertIsNone(client.error)
+
+        server.wait()
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org