You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/08/10 18:28:03 UTC

[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1744: refactor common HTTP code

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
     new c20b29a  DISPATCH-1744: refactor common HTTP code
c20b29a is described below

commit c20b29a1f70a507fd04a26b2d1692aa522fdb9b1
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Thu Aug 6 10:22:35 2020 -0400

    DISPATCH-1744: refactor common HTTP code
    
    This closes #815
---
 include/qpid/dispatch/http1_lib.h             |  118 +++
 python/qpid_dispatch/management/qdrouter.json |   20 +
 src/CMakeLists.txt                            |    3 +
 src/adaptors/http1/http1_adaptor.c            |   56 +
 src/adaptors/http1/http1_lib.c                | 1377 +++++++++++++++++++++++++
 src/adaptors/http_adaptor.c                   |  130 +--
 src/adaptors/http_adaptor.h                   |   36 +-
 src/adaptors/http_common.c                    |  243 +++++
 src/adaptors/http_common.h                    |  108 ++
 tests/system_tests_qdmanage.py                |    2 +-
 10 files changed, 1947 insertions(+), 146 deletions(-)

diff --git a/include/qpid/dispatch/http1_lib.h b/include/qpid/dispatch/http1_lib.h
new file mode 100644
index 0000000..58385e0
--- /dev/null
+++ b/include/qpid/dispatch/http1_lib.h
@@ -0,0 +1,118 @@
+#ifndef http1_lib_H
+#define http1_lib_H 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/dispatch/buffer.h>
+
+#include <inttypes.h>
+
+#define HTTP1_VERSION_1_1  "HTTP/1.1"
+#define HTTP1_VERSION_1_0  "HTTP/1.0"
+
+typedef struct http1_conn_t     http1_conn_t;
+typedef struct http1_transfer_t http1_transfer_t;
+
+
+typedef enum {
+    HTTP1_CONN_CLIENT,  // connection initiated by client
+    HTTP1_CONN_SERVER,  // connection to server
+} http1_conn_type_t;
+
+typedef enum {
+    HTTP1_STATUS_BAD_REQ = 400,
+    HTTP1_STATUS_SERVER_ERR = 500,
+    HTTP1_STATUS_BAD_VERSION = 505,
+} http1_status_code_t;
+
+typedef struct http1_conn_config_t {
+
+    http1_conn_type_t type;
+
+    // called with output data to write to the network
+    void (*conn_tx_data)(http1_conn_t *conn, qd_buffer_list_t *data, size_t offset, unsigned int len);
+
+    // @TODO(kgiusti) - remove?
+    //void (*conn_error)(http1_conn_t *conn, int code, const char *reason);
+
+    //
+    // RX message callbacks
+    //
+
+    // HTTP request received - new transfer created (xfer).  This xfer must be
+    // supplied in the http1_response() method
+    int (*xfer_rx_request)(http1_transfer_t *xfer,
+                           const char *method,
+                           const char *target,
+                           const char *version);
+
+    // HTTP response received - the transfer comes from the return value of the
+    // corresponding http1_request method.  Note well that if status_code is
+    // Informational (1xx) then this response is NOT the last response for the
+    // current request (See RFC7231, 6.2 Informational 1xx).  The xfer_done
+    // callback will be called after the LAST response has been received.
+    //
+    int (*xfer_rx_response)(http1_transfer_t *xfer,
+                            const char *version,
+                            int status_code,
+                            const char *reason_phrase);
+
+    int (*xfer_rx_header)(http1_transfer_t *xfer, const char *key, const char *value);
+    int (*xfer_rx_headers_done)(http1_transfer_t *xfer);
+
+    int (*xfer_rx_body)(http1_transfer_t *xfer, qd_buffer_list_t *body, size_t offset, size_t len);
+
+    void (*xfer_rx_done)(http1_transfer_t *xfer);
+
+    // Invoked when the request/response(s) exchange has completed
+    //
+    void (*xfer_done)(http1_transfer_t *xfer);
+} http1_conn_config_t;
+
+
+http1_conn_t *http1_connection(http1_conn_config_t *config, void *context);
+void http1_connection_close(http1_conn_t *conn);
+void *http1_connection_get_context(http1_conn_t *conn);
+
+// push inbound network data into the http1 library
+int http1_connection_rx_data(http1_conn_t *conn, qd_buffer_list_t *data, size_t len);
+
+
+//
+// API for sending HTTP/1.1 messages
+//
+void http1_transfer_set_context(http1_transfer_t *xfer, void *context);
+void *http1_transfer_get_context(const http1_transfer_t *xfer);
+http1_conn_t *http1_transfer_get_connection(const http1_transfer_t *xfer);
+
+// initiate a request - this creates a new message transfer context
+http1_transfer_t *http1_tx_request(http1_conn_t *conn, const char *method, const char *target, const char *version);
+
+// respond to a received request - the transfer context should be from the corresponding xfer_rx_request callback
+int http1_tx_response(http1_transfer_t *xfer, const char *version, int status_code, const char *reason_phrase);
+
+int http1_tx_add_header(http1_transfer_t *xfer, const char *key, const char *value);
+int http1_tx_body(http1_transfer_t *xfer, qd_buffer_list_t *data, size_t offset, size_t len);
+int http1_tx_done(http1_transfer_t *xfer);
+
+
+
+
+
+#endif // http1_lib_H
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index dc1dcb7..421fa0a 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1103,6 +1103,16 @@
                     "required": false,
                     "description": "Name of the sslProfile..",
                     "create": true
+                },
+                "protocolVersion": {
+                    "description": "The version of the HTTP protocol supported by this listener.",
+                    "type": [
+                        "HTTP/1.x",
+                        "HTTP/2.0"
+                    ],
+                    "default": "HTTP/1.x",
+                    "required": false,
+                    "create": true
                 }
             }
         },
@@ -1127,6 +1137,16 @@
                     "type": "string",
                     "create": true
 
+                },
+                "protocolVersion": {
+                    "description": "The version of the HTTP protocol supported by this connector.",
+                    "type": [
+                        "HTTP/1.x",
+                        "HTTP/2.0"
+                    ],
+                    "default": "HTTP/1.x",
+                    "required": false,
+                    "create": true
                 }
             }
         },
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 753436b..f7ac67e 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -39,7 +39,10 @@ add_custom_command (
 # Build the qpid-dispatch library.
 set(qpid_dispatch_SOURCES
   adaptors/reference_adaptor.c
+  adaptors/http_common.c
   adaptors/http_adaptor.c
+  adaptors/http1/http1_lib.c
+  adaptors/http1/http1_adaptor.c
   alloc_pool.c
   amqp.c
   bitmask.c
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
new file mode 100644
index 0000000..beb9eb4
--- /dev/null
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/http1_lib.h>
+#include <qpid/dispatch/protocol_adaptor.h>
+#include <qpid/dispatch/message.h>
+#include "adaptors/http_common.h"
+
+#include <stdio.h>
+#include <inttypes.h>
+
+
+typedef struct qd_http1_adaptor_t {
+    qdr_core_t               *core;
+    qdr_protocol_adaptor_t   *adaptor;
+    qd_http_lsnr_list_t       listeners;
+    qd_http_connector_list_t  connectors;
+    qd_log_source_t          *log;
+} qd_http1_adaptor_t;
+
+//static qd_http1_adaptor_t *http1_adaptor;
+
+#define BUFF_BATCH 16
+
+
+// dummy for now:
+qd_http_lsnr_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
+{
+    return 0;
+}
+
+void qd_http1_delete_listener(qd_dispatch_t *qd, qd_http_lsnr_t *listener) {}
+
+qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
+{
+    return 0;
+}
+
+void qd_http1_delete_connector(qd_dispatch_t *qd, qd_http_connector_t *conn) {}
+
diff --git a/src/adaptors/http1/http1_lib.c b/src/adaptors/http1/http1_lib.c
new file mode 100644
index 0000000..991d492
--- /dev/null
+++ b/src/adaptors/http1/http1_lib.c
@@ -0,0 +1,1377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/dispatch/http1_lib.h>
+
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/buffer.h>
+#include <qpid/dispatch/alloc_pool.h>
+
+#include <ctype.h>
+#include <stdio.h>
+#include <string.h>
+
+
+const uint8_t CR_TOKEN = '\r';
+const uint8_t LF_TOKEN = '\n';
+const char   *CRLF = "\r\n";
+
+const qd_iterator_pointer_t NULL_I_PTR = {0};
+
+// true for informational response codes
+#define IS_INFO_RESPONSE(code) ((code) / 100 == 1)
+
+typedef enum {
+    HTTP1_MSG_STATE_START = 0, // parsing start-line
+    HTTP1_MSG_STATE_HEADERS,   // parsing headers
+    HTTP1_MSG_STATE_BODY,      // parsing body
+    HTTP1_MSG_STATE_DONE,      // parsing complete
+} http1_msg_state_t;
+
+
+typedef enum {
+    HTTP1_CHUNK_HEADER = 0,    // waiting for chunk header
+    HTTP1_CHUNK_DATA,          // reading chunk data
+    HTTP1_CHUNK_TRAILERS,      // reading until lone CRLF
+} http1_chunk_state_t;
+
+
+typedef struct scratch_buffer_t {
+    uint8_t *buf;
+    size_t   size;  // of buffer, not contents!
+} scratch_buffer_t;
+
+
+// state for a single request-response transaction
+//
+struct http1_transfer_t {
+    DEQ_LINKS(struct http1_transfer_t);
+    void         *context;
+    http1_conn_t *conn;
+    uint32_t      response_code;
+
+    bool close_on_done;     // true if connection must be closed when transfer completes
+    bool is_head_method;    // true if request method is HEAD
+    bool is_connect_method; // true if request method is CONNECT TODO(kgiusti): supported?
+};
+DEQ_DECLARE(http1_transfer_t, http1_transfer_list_t);
+ALLOC_DECLARE(http1_transfer_t);
+ALLOC_DEFINE(http1_transfer_t);
+
+
+// The HTTP/1.1 connection
+//
+struct http1_conn_t {
+    void *context;
+
+    // http requests are added to tail,
+    // in-progress response is at head
+    http1_transfer_list_t xfers;
+
+    // Decoder for current incoming msg.
+    //
+    // incoming: holds the raw data received by the proactor from this
+    // connection.
+    //
+    // read_ptr: points to the next octet to be decoded on the incoming buffer
+    // list. Remaining is the length of the raw data to be decoded.
+    //
+    // body_ptr: points to the first unconsumed octet of the message
+    // body. Remaining is the number of octets that may be consumed.
+    // Invariant: body_ptr.buffer always points to the incoming.head as body
+    // data is being parsed.
+    //
+    struct decoder_t {
+        qd_buffer_list_t       incoming;
+        qd_iterator_pointer_t  read_ptr;
+        qd_iterator_pointer_t  body_ptr;
+
+        http1_transfer_t      *xfer;            // current transfer
+        http1_msg_state_t      state;
+        scratch_buffer_t       scratch;
+        int                    error;
+        const char            *error_msg;
+
+        uint64_t               content_length;
+        http1_chunk_state_t    chunk_state;
+        uint64_t               chunk_length;
+
+        bool is_request;
+        bool is_chunked;
+        bool is_1_0;
+
+        // decoded headers
+        bool hdr_transfer_encoding;
+        bool hdr_content_length;
+    } decoder;
+
+    // Encoder for current outgoing msg.
+    // outgoing: holds the encoded data that needs to be sent to proactor for
+    // sending out this connection
+    // write_ptr: points to the first empty octet to be written to by the
+    // encoder.  Remaining is the total unused space in the outgoing list
+    // (capacity)
+    // Note that the outgoing list and the write_ptr are only used for the
+    // start line and headers.  Body content buffer chains are past directly to
+    // the connection without encoding.
+    //
+    struct encoder_t {
+        qd_buffer_list_t       outgoing;
+        qd_iterator_pointer_t  write_ptr;
+        http1_transfer_t      *xfer;           // current transfer
+
+        bool is_request;
+        bool crlf_sent;    // true if the CRLF after headers has been sent
+    } encoder;
+
+    http1_conn_config_t config;
+};
+ALLOC_DECLARE(http1_conn_t);
+ALLOC_DEFINE(http1_conn_t);
+
+static void decoder_reset(struct decoder_t *d);
+static void encoder_reset(struct encoder_t *e);
+
+
+// Create a new transfer - this is done when a new http request occurs
+// Keep oldest outstanding tranfer at DEQ_HEAD(conn->xfers)
+static http1_transfer_t *http1_transfer(http1_conn_t *conn)
+{
+    http1_transfer_t *xfer = new_http1_transfer_t();
+    ZERO(xfer);
+    xfer->conn = conn;
+    DEQ_INSERT_TAIL(conn->xfers, xfer);
+    return xfer;
+}
+
+
+static void http1_transfer_free(http1_transfer_t *xfer)
+{
+    if (xfer) {
+        http1_conn_t *conn = xfer->conn;
+        assert(conn->decoder.xfer != xfer);
+        assert(conn->encoder.xfer != xfer);
+        DEQ_REMOVE(conn->xfers, xfer);
+        free_http1_transfer_t(xfer);
+    }
+}
+
+
+http1_conn_t *http1_connection(http1_conn_config_t *config, void *context)
+{
+    http1_conn_t *conn = new_http1_conn_t();
+    ZERO(conn);
+
+    conn->context = context;
+    conn->config = *config;
+    DEQ_INIT(conn->xfers);
+
+    encoder_reset(&conn->encoder);
+    DEQ_INIT(conn->encoder.outgoing);
+    conn->encoder.write_ptr = NULL_I_PTR;
+
+    decoder_reset(&conn->decoder);
+    DEQ_INIT(conn->decoder.incoming);
+    conn->decoder.read_ptr = NULL_I_PTR;
+
+    return conn;
+}
+
+
+// Close the connection conn.
+//
+// This cancels all outstanding transfers and destroys the connection.  If
+// there is an incoming response message body being parsed when this function
+// is invoked it will signal the end of the message.
+//
+void http1_connection_close(http1_conn_t *conn)
+{
+    if (conn) {
+        struct decoder_t *decoder = &conn->decoder;
+        if (!decoder->error) {
+            if (decoder->state == HTTP1_MSG_STATE_BODY
+                && decoder->xfer) {
+
+                // terminate the incoming message as the server has closed the
+                // connection to indicate the end of the message
+                conn->config.xfer_rx_done(decoder->xfer);
+            }
+
+            // notify any outstanding transfers
+            http1_transfer_t *xfer = DEQ_HEAD(conn->xfers);
+            while (xfer) {
+                conn->config.xfer_done(xfer);
+                xfer = DEQ_NEXT(xfer);
+            }
+        }
+
+        decoder_reset(&conn->decoder);
+        encoder_reset(&conn->encoder);
+        qd_buffer_list_free_buffers(&conn->decoder.incoming);
+        qd_buffer_list_free_buffers(&conn->encoder.outgoing);
+        free(conn->decoder.scratch.buf);
+
+        http1_transfer_t *xfer = DEQ_HEAD(conn->xfers);
+        while (xfer) {
+            http1_transfer_free(xfer);  // removes from conn->xfers list
+            xfer = DEQ_HEAD(conn->xfers);
+        }
+
+        free_http1_conn_t(conn);
+    }
+}
+
+
+// reset the rx decoder state after message received
+//
+static void decoder_reset(struct decoder_t *decoder)
+{
+    // do not touch the read_ptr or incoming buffer list as they
+    // track the current position in the incoming data stream
+
+    decoder->body_ptr = NULL_I_PTR;
+    decoder->xfer = 0;
+    decoder->state = HTTP1_MSG_STATE_START;
+    decoder->content_length = 0;
+    decoder->chunk_state = HTTP1_CHUNK_HEADER;
+    decoder->chunk_length = 0;
+    decoder->error = 0;
+    decoder->error_msg = 0;
+
+    decoder->is_request = false;
+    decoder->is_chunked = false;
+    decoder->is_1_0 = false;
+
+    decoder->hdr_transfer_encoding = false;
+    decoder->hdr_content_length = false;
+}
+
+// reset the tx encoder after message sent
+static void encoder_reset(struct encoder_t *encoder)
+{
+    // do not touch the write_ptr or the outgoing queue as there may be more messages to send.
+    encoder->xfer = 0;
+    encoder->is_request = false;
+    encoder->crlf_sent = false;
+}
+
+
+// ensure the encoder has at least capacity octets available
+//
+static void ensure_outgoing_capacity(struct encoder_t *encoder, size_t capacity)
+{
+    while (encoder->write_ptr.remaining < capacity) {
+        qd_buffer_t *buf = qd_buffer();
+        DEQ_INSERT_TAIL(encoder->outgoing, buf);
+        encoder->write_ptr.remaining += qd_buffer_capacity(buf);
+    }
+    if (!encoder->write_ptr.buffer) {
+        encoder->write_ptr.buffer = DEQ_HEAD(encoder->outgoing);
+        encoder->write_ptr.cursor = qd_buffer_cursor(encoder->write_ptr.buffer);
+    }
+}
+
+
+// Write a C string to the encoder.
+//
+static void write_string(struct encoder_t *encoder, const char *string)
+{
+    size_t needed = strlen(string);
+    ensure_outgoing_capacity(encoder, needed);
+
+    qd_iterator_pointer_t *wptr = &encoder->write_ptr;
+    while (needed) {
+        if (qd_buffer_capacity(wptr->buffer) == 0) {
+            wptr->buffer = DEQ_NEXT(wptr->buffer);
+            wptr->cursor = qd_buffer_base(wptr->buffer);
+        }
+
+        size_t avail = MIN(needed, qd_buffer_capacity(wptr->buffer));
+        memcpy(wptr->cursor, string, avail);
+        qd_buffer_insert(wptr->buffer, avail);
+        wptr->cursor += avail;
+        wptr->remaining -= avail;
+        string += avail;
+        needed -= avail;
+    }
+}
+
+
+//
+static inline size_t skip_octets(qd_iterator_pointer_t *data, size_t amount)
+{
+    size_t count = 0;
+    amount = MIN(data->remaining, amount);
+    while (count < amount) {
+        if (data->cursor == qd_buffer_cursor(data->buffer)) {
+            data->buffer = DEQ_NEXT(data->buffer);
+            assert(data->buffer);  // else data->remaining is bad
+            data->cursor = qd_buffer_base(data->buffer);
+        }
+        size_t available = qd_buffer_cursor(data->buffer) - data->cursor;
+        available = MIN(available, amount - count);
+        data->cursor += available;
+        count += available;
+    }
+    data->remaining -= amount;
+    return amount;
+}
+
+// consume next octet and advance the pointer
+static inline bool get_octet(qd_iterator_pointer_t *data, uint8_t *octet)
+{
+    if (data->remaining > 0) {
+        if (data->cursor == qd_buffer_cursor(data->buffer)) {
+            data->buffer = DEQ_NEXT(data->buffer);
+            data->cursor = qd_buffer_base(data->buffer);
+        }
+        *octet = *data->cursor;
+        data->cursor += 1;
+        data->remaining -= 1;
+        return true;
+    }
+    return false;
+}
+
+
+// True if line contains just "CRLF"
+//
+static bool is_empty_line(const qd_iterator_pointer_t *line)
+{
+    if (line->remaining == 2) {
+        qd_iterator_pointer_t tmp = *line;
+        uint8_t octet;
+        return (get_octet(&tmp, &octet) && octet == CR_TOKEN
+                && get_octet(&tmp, &octet) && octet == LF_TOKEN);
+    }
+    return false;
+}
+
+static void debug_print_iterator_pointer(const char *prefix, const qd_iterator_pointer_t *ptr)
+{
+    qd_iterator_pointer_t tmp = *ptr;
+    fprintf(stdout, "%s '", prefix);
+    size_t len = MIN(tmp.remaining, 80);
+    uint8_t octet;
+    while (len-- > 0 && get_octet(&tmp, &octet)) {
+        fputc(octet, stdout);
+    }
+    fprintf(stdout, "%s'\n", (tmp.remaining) ? " <truncated>" : "");
+    fflush(stdout);
+}
+
+
+// read a CRLF terminated line starting at 'data'.
+// On success, 'data' is advanced to the octet following the LF and 'line' is
+// set to the read line (including trailing CRLF).  Returns false if no CRLF found
+//
+static bool read_line(qd_iterator_pointer_t *data, qd_iterator_pointer_t *line)
+{
+    qd_iterator_pointer_t tmp = *data;
+
+    *line = *data;
+    line->remaining = 0;
+
+    bool   eol = false;
+
+    uint8_t octet;
+    while (!eol && get_octet(&tmp, &octet)) {
+        line->remaining += 1;
+        if (octet == CR_TOKEN) {
+            if (get_octet(&tmp, &octet)) {
+                line->remaining += 1;
+                if (octet == LF_TOKEN) {
+                    eol = true;
+                }
+            }
+        }
+    }
+
+    if (eol) {
+        *data = tmp;
+        return true;
+    } else {
+        *line = NULL_I_PTR;
+        return false;
+    }
+}
+
+
+static bool ensure_scratch_size(scratch_buffer_t *b, size_t required)
+{
+    if (b->size < required) {
+        if (b->buf)
+            free(b->buf);
+        b->size = required;
+        b->buf = malloc(b->size);
+    }
+
+    // @TODO(kgiusti): deal with malloc failure
+    return true;
+}
+
+
+// trims any optional whitespace characters at the start of 'line'
+// RFC7230 defines OWS as zero or more spaces or horizontal tabs
+//
+static void trim_whitespace(qd_iterator_pointer_t *line)
+{
+    qd_iterator_pointer_t ptr = *line;
+    size_t skip = 0;
+    uint8_t octet;
+    while (get_octet(&ptr, &octet) && isblank(octet))
+        skip += 1;
+    if (skip)
+        skip_octets(line, skip);
+}
+
+// copy out iterator to a buffer and null terminate. Return # of bytes written
+// to str including terminating null.
+static size_t pointer_2_str(const qd_iterator_pointer_t *line, unsigned char *str, size_t len)
+{
+    assert(len);
+    qd_iterator_pointer_t tmp = *line;
+    uint8_t *ptr = (uint8_t *)str;
+    len -= 1;  // reserve for null terminator
+    while (len-- > 0 && get_octet(&tmp, ptr))
+        ++ptr;
+    *ptr++ = 0;
+    return ptr - (uint8_t *)str;
+}
+
+
+// Parse out a token as defined by RFC7230 and store the result in 'token'.
+// 'line' is advanced past the token.  This is used for parsing fields that
+// RFC7230 defines as 'tokens'.
+//
+static bool parse_token(qd_iterator_pointer_t *line, qd_iterator_pointer_t *token)
+{
+    static const char *TOKEN_EXTRA = "!#$%&’*+-.^_‘|~";
+
+    trim_whitespace(line);
+    qd_iterator_pointer_t tmp = *line;
+    *token = tmp;
+    size_t len = 0;
+    uint8_t octet;
+    while (get_octet(&tmp, &octet)
+           && (('A' <= octet && octet <= 'Z') ||
+               ('a' <= octet && octet <= 'z') ||
+               ('0' <= octet && octet <= '9') ||
+               (strchr(TOKEN_EXTRA, octet)))) {
+        len++;
+    }
+
+    if (len) {
+        token->remaining = len;
+        skip_octets(line, len);
+        return true;
+    }
+    *token = NULL_I_PTR;
+    return false;
+}
+
+
+// Parse out a text field delineated by whitespace.
+// 'line' is advanced past the field.
+//
+static bool parse_field(qd_iterator_pointer_t *line, qd_iterator_pointer_t *field)
+{
+    trim_whitespace(line);
+    qd_iterator_pointer_t tmp = *line;
+    *field = tmp;
+    size_t len = 0;
+    uint8_t octet;
+    while (get_octet(&tmp, &octet) && !isspace(octet))
+        len++;
+
+    if (len) {
+        field->remaining = len;
+        skip_octets(line, len);
+        return true;
+    }
+    *field = NULL_I_PTR;
+    return false;
+}
+
+
+// parse the HTTP/1.1 request line:
+// "method SP request-target SP HTTP-version CRLF"
+//
+static bool parse_request_line(http1_conn_t *conn, struct decoder_t *decoder, qd_iterator_pointer_t *line)
+{
+    qd_iterator_pointer_t method = {0};
+    qd_iterator_pointer_t target = {0};
+    qd_iterator_pointer_t version = {0};
+
+    if (!parse_token(line, &method) ||
+        !parse_field(line, &target) ||
+        !parse_field(line, &version)) {
+
+        decoder->error_msg = "Malformed request line";
+        decoder->error = HTTP1_STATUS_BAD_REQ;
+        return decoder->error;
+    }
+
+    // translate iterator pointers to C strings
+    ensure_scratch_size(&decoder->scratch, method.remaining + target.remaining + version.remaining + 3);
+    uint8_t *ptr = decoder->scratch.buf;
+    size_t avail = decoder->scratch.size;
+
+    uint8_t *method_str = ptr;
+    size_t offset = pointer_2_str(&method, method_str, avail);
+    ptr += offset;
+    avail -= offset;
+
+    uint8_t *target_str = ptr;
+    offset = pointer_2_str(&target, target_str, avail);
+    ptr += offset;
+    avail += offset;
+
+    uint8_t *version_str = ptr;
+    pointer_2_str(&version, version_str, avail);
+
+    uint32_t major = 0;
+    uint32_t minor = 0;
+    if (sscanf((char*)version_str, "HTTP/%"SCNu32".%"SCNu32, &major, &minor) != 2) {
+        decoder->error_msg = "Malformed version in request";
+        decoder->error = HTTP1_STATUS_BAD_REQ;
+        return decoder->error;
+    }
+
+    if (major != 1 || minor > 1) {
+        decoder->error_msg = "Unsupported HTTP version";
+        decoder->error = HTTP1_STATUS_BAD_VERSION;
+        return decoder->error;
+    }
+
+    http1_transfer_t *xfer = http1_transfer(conn);
+
+    // check for methods that do not support body content in the response:
+    if (strcmp((char*)method_str, "HEAD") == 0)
+        xfer->is_head_method = true;
+    else if (strcmp((char*)method_str, "CONNECT") == 0)
+        xfer->is_connect_method = true;
+
+    decoder->xfer = xfer;
+    decoder->is_request = true;
+    decoder->is_1_0 = (minor == 0);
+
+    decoder->error = conn->config.xfer_rx_request(xfer, (char*)method_str, (char*)target_str, (char*)version_str);
+    if (decoder->error)
+        decoder->error_msg = "xfer_rx_request callback error";
+    return decoder->error;
+}
+
+
+// parse the HTTP/1.1 response line
+// "HTTP-version SP status-code [SP reason-phrase] CRLF"
+//
+static int parse_response_line(http1_conn_t *conn, struct decoder_t *decoder, qd_iterator_pointer_t *line)
+{
+    qd_iterator_pointer_t version = {0};
+    qd_iterator_pointer_t status_code = {0};
+    qd_iterator_pointer_t reason = {0};
+
+    if (!parse_field(line, &version)
+        || !parse_field(line, &status_code)
+        || status_code.remaining != 3) {
+
+        decoder->error_msg = "Malformed response status line";
+        decoder->error = HTTP1_STATUS_SERVER_ERR;
+        return decoder->error;
+    }
+
+    // Responses arrive in the same order as requests are generated so this new
+    // response corresponds to head xfer
+    http1_transfer_t *xfer = DEQ_HEAD(conn->xfers);
+    if (!xfer) {
+        // receiving a response without a corresponding request
+        decoder->error_msg = "Spurious HTTP response received";
+        decoder->error = HTTP1_STATUS_SERVER_ERR;
+        return decoder->error;
+    }
+
+    assert(!decoder->xfer);   // state machine violation
+    assert(xfer->response_code == 0);
+
+    decoder->xfer = xfer;
+
+    unsigned char code_str[4];
+    pointer_2_str(&status_code, code_str, 4);
+    xfer->response_code = atoi((char*) code_str);
+
+    // the reason phrase is optional, and may contain spaces
+
+    reason = *line;
+    if (reason.remaining >= 2)  // expected for CRLF
+        reason.remaining -= 2;
+    trim_whitespace(&reason);
+
+    // convert to C strings
+    ensure_scratch_size(&decoder->scratch, version.remaining + reason.remaining + 2);
+    uint8_t *ptr = decoder->scratch.buf;
+    size_t avail = decoder->scratch.size;
+
+    uint8_t *version_str = ptr;
+    size_t offset = pointer_2_str(&version, version_str, avail);
+    ptr += offset;
+    avail -= offset;
+
+    uint8_t *reason_str = ptr;
+    offset = pointer_2_str(&reason, reason_str, avail);
+
+    uint32_t major = 0;
+    uint32_t minor = 0;
+    if (sscanf((char*)version_str, "HTTP/%"SCNu32".%"SCNu32, &major, &minor) != 2) {
+        decoder->error_msg = "Malformed version in response";
+        decoder->error = HTTP1_STATUS_SERVER_ERR;
+        return decoder->error;
+    }
+
+    if (major != 1 || minor > 1) {
+        decoder->error_msg = "Unsupported HTTP version";
+        decoder->error = HTTP1_STATUS_BAD_VERSION;
+        return decoder->error;
+    }
+
+    decoder->is_request = false;
+    decoder->is_1_0 = (minor == 0);
+
+    decoder->error = conn->config.xfer_rx_response(decoder->xfer, (char*)version_str,
+                                                   xfer->response_code,
+                                                   (offset) ? (char*)reason_str: 0);
+    if (decoder->error)
+        decoder->error_msg = "xfer_rx_response callback error";
+
+    return decoder->error;
+}
+
+
+// parse the first line of an incoming http message
+//
+static bool parse_start_line(http1_conn_t *conn, struct decoder_t *decoder)
+{
+    qd_iterator_pointer_t *rptr = &decoder->read_ptr;
+    qd_iterator_pointer_t line;
+
+    if (read_line(rptr, &line)) {
+        debug_print_iterator_pointer("start line:", &line);
+
+        if (!is_empty_line(&line)) {  // RFC7230: ignore any preceding CRLF
+            if (conn->config.type == HTTP1_CONN_CLIENT) {
+                parse_request_line(conn, decoder, &line);
+            } else {
+                parse_response_line(conn, decoder, &line);
+            }
+            conn->decoder.state = HTTP1_MSG_STATE_HEADERS;
+        }
+        return !!rptr->remaining;
+    }
+
+    return false;  // pend for more input
+}
+
+//
+// Header parsing
+//
+
+// Called after the last incoming header was decoded and passed to the
+// application
+//
+static bool process_headers_done(http1_conn_t *conn, struct decoder_t *decoder)
+{
+    // Flush all buffers processed so far - no longer needed
+
+    qd_buffer_t *head = DEQ_HEAD(decoder->incoming);
+    while (head && head != decoder->read_ptr.buffer) {
+        DEQ_REMOVE_HEAD(decoder->incoming);
+        qd_buffer_free(head);
+        head = DEQ_HEAD(decoder->incoming);
+    }
+
+    // perform any post-headers validation:
+
+    if (decoder->is_request) {
+        if (decoder->hdr_transfer_encoding && !decoder->is_chunked) {
+            // RFC7230 Message Body Length: If a Transfer-Encoding header field
+            // is present in a request and the chunked transfer coding is not
+            // the final encoding, the message body length cannot be determined
+            // reliably; the server MUST respond with the 400 (Bad Request)
+            // status code and then close the connection.
+            decoder->error_msg = "Non-chunked Tranfer-Encoding in request";
+            decoder->error = HTTP1_STATUS_BAD_REQ;
+            return false;
+        }
+    }
+
+    decoder->error = conn->config.xfer_rx_headers_done(decoder->xfer);
+    if (decoder->error) {
+        decoder->error_msg = "xfer_rx_headers_done callback error";
+        return false;
+    }
+
+    // determine if a body is present (ref RFC7230 sec 3.3.3 Message Body Length)
+    bool has_body;
+    if (decoder->is_request) {
+        // an http request will have a body ONLY if either chunked transfer or
+        // non-zero Content-Length header was given.
+        has_body = (decoder->is_chunked || decoder->content_length);
+
+    } else {
+        // An HTTP response has a body if request method is NOT HEAD or CONNECT AND
+        // the response code indicates a body.  A body will either have a specific
+        // size via Content-Length or chunked encoder, OR its length is unspecified
+        // and the message body is terminated by closing the connection.
+        //
+        http1_transfer_t *xfer = decoder->xfer;
+        has_body = !(xfer->is_head_method       ||
+                     xfer->is_connect_method    ||
+                     xfer->response_code == 204 ||     // No Content
+                     xfer->response_code == 205 ||     // Reset Content
+                     xfer->response_code == 304 ||     // Not Modified
+                     IS_INFO_RESPONSE(xfer->response_code));
+        if (has_body) {
+            // no body if explicit Content-Length of zero
+            if (decoder->hdr_content_length && decoder->content_length == 0) {
+                has_body = false;
+            }
+        }
+    }
+
+    if (has_body) {
+        // start tracking the body buffer chain
+        decoder->body_ptr = decoder->read_ptr;
+        decoder->body_ptr.remaining = 0;
+        decoder->state = HTTP1_MSG_STATE_BODY;
+    } else {
+        decoder->state = HTTP1_MSG_STATE_DONE;
+    }
+
+    return !!decoder->read_ptr.remaining;
+}
+
+
+// process a received header to determine message body length, etc.
+//
+static int process_header(http1_conn_t *conn, struct decoder_t *decoder, const uint8_t *key, const uint8_t *value)
+{
+    int parse_error = decoder->is_request ? HTTP1_STATUS_BAD_REQ : HTTP1_STATUS_SERVER_ERR;
+
+    if (strcasecmp("Content-Length", (char*) key) == 0) {
+        uint64_t old = decoder->content_length;
+        if (sscanf((char*)value, "%"PRIu64, &decoder->content_length) != 1) {
+            decoder->error_msg = "Malformed Content-Length header";
+            decoder->error = parse_error;
+            return decoder->error;
+        }
+        if (old && old != decoder->content_length) {
+            decoder->error_msg = "Invalid duplicate Content-Length header";
+            decoder->error = parse_error;
+            return decoder->error;
+        }
+        decoder->hdr_content_length = true;
+
+    } else if (strcasecmp("Transfer-Encoding", (char *)key) == 0) {
+        // check if "chunked" is present and it is the last item in the value
+        // string.  And remember kids: coding type names are case insensitive!
+        // Also note "value" has already been trimmed of whitespace at both
+        // ends.
+        size_t len = strlen((char*)value);
+        if (len >= 7) {  // 7 = strlen("chunked")
+            const char *ptr = ((char*) value) + len - 7;
+            decoder->is_chunked = strcasecmp("chunked", ptr) == 0;
+        }
+        decoder->hdr_transfer_encoding = true;
+    }
+
+    return 0;
+}
+
+
+// Parse out the header key and value
+//
+static bool parse_header(http1_conn_t *conn, struct decoder_t *decoder)
+{
+    qd_iterator_pointer_t *rptr = &decoder->read_ptr;
+    qd_iterator_pointer_t line;
+    http1_transfer_t *xfer = decoder->xfer;
+    assert(xfer);  // else state machine busted
+
+    if (read_line(rptr, &line)) {
+        debug_print_iterator_pointer("header:", &line);
+
+        if (is_empty_line(&line)) {
+            // end of headers
+            return process_headers_done(conn, decoder);
+        }
+
+        qd_iterator_pointer_t key = {0};
+
+        if (!parse_token(&line, &key)) {
+            decoder->error_msg = "Malformed Header";
+            decoder->error = (decoder->is_request) ? HTTP1_STATUS_BAD_REQ
+                : HTTP1_STATUS_SERVER_ERR;
+            return false;
+        }
+
+        // advance line past the ':'
+        uint8_t octet;
+        while (get_octet(&line, &octet) && octet != ':')
+            ;
+
+        // line now contains the value. convert to C strings and post callback
+        ensure_scratch_size(&decoder->scratch, key.remaining + line.remaining + 2);
+        uint8_t *ptr = decoder->scratch.buf;
+        size_t avail = decoder->scratch.size;
+
+        uint8_t *key_str = ptr;
+        size_t offset = pointer_2_str(&key, key_str, avail);
+        ptr += offset;
+        avail -= offset;
+
+        uint8_t *value_str = ptr;
+        pointer_2_str(&line, value_str, avail);
+
+        // trim whitespace on both ends of value
+        while (isspace(*value_str))
+            ++value_str;
+        ptr = value_str + strlen((char*) value_str);
+        while (ptr-- > value_str) {
+            if (!isspace(*ptr))
+                break;
+            *ptr = 0;
+        }
+
+        process_header(conn, decoder, key_str, value_str);
+
+        if (!decoder->error) {
+            decoder->error = conn->config.xfer_rx_header(xfer, (char *)key_str, (char *)value_str);
+            if (decoder->error)
+                decoder->error_msg = "xfer_rx_header callback error";
+        }
+
+        return !!rptr->remaining;
+    }
+
+    return false;  // pend for more data
+}
+
+//
+// Chunked body encoding parser
+//
+
+
+// Pass message body data up to the application.
+//
+static inline int consume_body_data(http1_conn_t *conn, bool flush)
+{
+    struct decoder_t *decoder = &conn->decoder;
+    qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
+    qd_iterator_pointer_t *rptr = &decoder->read_ptr;
+
+    // shortcut: if no more data to parse send the entire incoming chain
+    if (rptr->remaining == 0) {
+
+        decoder->error = conn->config.xfer_rx_body(decoder->xfer, &decoder->incoming,
+                                                   body_ptr->cursor - qd_buffer_base(body_ptr->buffer),
+                                                   body_ptr->remaining);
+        DEQ_INIT(decoder->incoming);
+        *body_ptr = NULL_I_PTR;
+        *rptr = NULL_I_PTR;
+        return decoder->error;
+    }
+
+    // The read pointer points to somewhere in the buffer chain that contains some
+    // unparsed data.  Send any buffers preceding the current read pointer.
+    qd_buffer_list_t blist = DEQ_EMPTY;
+    size_t octets = 0;
+    const size_t body_offset = body_ptr->cursor - qd_buffer_base(body_ptr->buffer);
+
+    // invariant:
+    assert(DEQ_HEAD(decoder->incoming) == body_ptr->buffer);
+
+    while (body_ptr->buffer && body_ptr->buffer != rptr->buffer) {
+        DEQ_REMOVE_HEAD(decoder->incoming);
+        DEQ_INSERT_TAIL(blist, body_ptr->buffer);
+        octets += qd_buffer_cursor(body_ptr->buffer) - body_ptr->cursor;
+        body_ptr->buffer = DEQ_HEAD(decoder->incoming);
+        body_ptr->cursor = qd_buffer_base(body_ptr->buffer);
+    }
+
+    // invariant:
+    assert(octets <= body_ptr->remaining);
+    body_ptr->remaining -= octets;
+
+    if (flush && body_ptr->remaining) {
+        // need to copy out remaining body octets into new buffer
+        qd_buffer_t *tail = qd_buffer();
+
+        assert(body_ptr->remaining <= qd_buffer_capacity(tail));
+        memcpy(qd_buffer_cursor(tail), body_ptr->cursor, body_ptr->remaining);
+        qd_buffer_insert(tail, body_ptr->remaining);
+        DEQ_INSERT_TAIL(blist, tail);
+        octets += body_ptr->remaining;
+
+        *body_ptr = *rptr;
+        body_ptr->remaining = 0;
+    }
+
+    decoder->error = conn->config.xfer_rx_body(decoder->xfer, &blist, body_offset, octets);
+    return decoder->error;
+}
+
+
+
+// parsing the start of a chunked header:
+// <chunk size in hex>CRLF
+//
+static bool parse_body_chunked_header(http1_conn_t *conn, struct decoder_t *decoder)
+{
+    qd_iterator_pointer_t *rptr = &decoder->read_ptr;
+    qd_iterator_pointer_t line;
+
+    assert(decoder->chunk_state == HTTP1_CHUNK_HEADER);
+    assert(decoder->chunk_length == 0);
+
+    if (read_line(rptr, &line)) {
+        decoder->body_ptr.remaining += line.remaining;
+
+        ensure_scratch_size(&decoder->scratch, line.remaining + 1);
+        uint8_t *ptr = decoder->scratch.buf;
+        pointer_2_str(&line, (unsigned char*) ptr, line.remaining + 1);
+        int rc = sscanf((char*) ptr, "%"SCNx64, &decoder->chunk_length);
+        if (rc != 1) {
+            decoder->error_msg = "Invalid chunk header";
+            decoder->error = (decoder->is_request) ? HTTP1_STATUS_BAD_REQ
+                : HTTP1_STATUS_SERVER_ERR;
+            return false;
+        }
+
+        if (decoder->chunk_length == 0) {
+            // last chunk
+            decoder->chunk_state = HTTP1_CHUNK_TRAILERS;
+        } else {
+            decoder->chunk_state = HTTP1_CHUNK_DATA;
+
+            // chunk_length does not include the CRLF trailer:
+            decoder->chunk_length += 2;
+        }
+
+
+        return !!rptr->remaining;
+    }
+
+    return false;  // pend for more input
+}
+
+
+// Parse the data section of a chunk
+//
+static bool parse_body_chunked_data(http1_conn_t *conn, struct decoder_t *decoder)
+{
+    qd_iterator_pointer_t *rptr = &decoder->read_ptr;
+    qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
+
+    assert(decoder->chunk_state == HTTP1_CHUNK_DATA);
+
+    size_t skipped = skip_octets(rptr, decoder->chunk_length);
+    decoder->chunk_length -= skipped;
+    body_ptr->remaining += skipped;
+
+    if (decoder->chunk_length == 0) {
+        // end of chunk
+        decoder->chunk_state = HTTP1_CHUNK_HEADER;
+        consume_body_data(conn, false);
+    }
+
+    return !!rptr->remaining;
+}
+
+
+// Keep reading chunk trailers until the terminating empty line is read
+//
+static bool parse_body_chunked_trailer(http1_conn_t *conn, struct decoder_t *decoder)
+{
+    qd_iterator_pointer_t *rptr = &decoder->read_ptr;
+    qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
+    qd_iterator_pointer_t line;
+
+    assert(decoder->chunk_state == HTTP1_CHUNK_TRAILERS);
+
+    if (read_line(rptr, &line)) {
+        body_ptr->remaining += line.remaining;
+        if (is_empty_line(&line)) {
+            // end of message
+            consume_body_data(conn, true);
+            decoder->state = HTTP1_MSG_STATE_DONE;
+        }
+
+        return !!rptr->remaining;
+    }
+
+    return false;  // pend for full line
+}
+
+
+// parse an incoming message body which is chunk encoded
+// Return True if there is more data pending to parse
+static bool parse_body_chunked(http1_conn_t *conn, struct decoder_t *decoder)
+{
+    bool more;
+    switch (decoder->chunk_state) {
+
+    case HTTP1_CHUNK_HEADER:
+        more = parse_body_chunked_header(conn, decoder);
+        break;
+
+    case HTTP1_CHUNK_DATA:
+        more = parse_body_chunked_data(conn, decoder);
+        break;
+
+    case HTTP1_CHUNK_TRAILERS:
+        more = parse_body_chunked_trailer(conn, decoder);
+        break;
+    } // end switch
+
+    return more;
+}
+
+
+// parse an incoming message body which is Content-Length bytes long
+//
+static bool parse_body_content(http1_conn_t *conn, struct decoder_t *decoder)
+{
+    qd_iterator_pointer_t *rptr = &decoder->read_ptr;
+    qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
+
+    size_t skipped = skip_octets(rptr, decoder->content_length);
+    decoder->content_length -= skipped;
+    body_ptr->remaining += skipped;
+    bool eom = decoder->content_length == 0;
+
+    consume_body_data(conn, eom);
+    if (eom)
+        decoder->state = HTTP1_MSG_STATE_DONE;
+
+    return !!rptr->remaining;
+}
+
+
+static bool parse_body(http1_conn_t *conn, struct decoder_t *decoder)
+{
+    if (decoder->is_chunked)
+        return parse_body_chunked(conn, decoder);
+
+    if (decoder->content_length)
+        return parse_body_content(conn, decoder);
+
+    // otherwise no explict body size, so just keep passing the entire unparsed
+    // incoming chain along until the remote closes the connection
+    decoder->error = conn->config.xfer_rx_body(decoder->xfer,
+                                               &decoder->incoming,
+                                               decoder->read_ptr.cursor
+                                               - qd_buffer_base(decoder->read_ptr.buffer),
+                                               decoder->read_ptr.remaining);
+    if (decoder->error) {
+        decoder->error_msg = "xfer_rx_body callback error";
+        return false;
+    }
+
+    decoder->body_ptr = decoder->read_ptr = NULL_I_PTR;
+    DEQ_INIT(decoder->incoming);
+
+    return false;
+}
+
+
+// Called when incoming message is complete
+//
+static bool parse_done(http1_conn_t *conn, struct decoder_t *decoder)
+{
+    http1_transfer_t *xfer = decoder->xfer;
+    bool is_response = !decoder->is_request;
+
+    if (!decoder->error) {
+        // signal the message receive is complete
+        conn->config.xfer_rx_done(xfer);
+
+        if (is_response) {   // request<->response transfer complete
+
+            // Informational 1xx response codes are NOT teriminal - further responses are allowed!
+            if (IS_INFO_RESPONSE(xfer->response_code)) {
+                xfer->response_code = 0;
+            } else {
+                // The message exchange is complete
+                conn->config.xfer_done(xfer);
+                decoder->xfer = 0;
+                http1_transfer_free(xfer);
+            }
+        }
+
+        decoder_reset(decoder);
+        return !!decoder->read_ptr.remaining;
+    }
+    return false;
+}
+
+
+// Main decode loop.
+// Process received data until it is exhausted
+//
+static int decode_incoming(http1_conn_t *conn)
+{
+    struct decoder_t *decoder = &conn->decoder;
+    bool more = true;
+    while (more && !decoder->error) {
+
+        if (decoder->state == HTTP1_MSG_STATE_START)
+            more = parse_start_line(conn, decoder);
+
+        else if (decoder->state == HTTP1_MSG_STATE_HEADERS)
+            more = parse_header(conn, decoder);
+
+        else if (decoder->state ==  HTTP1_MSG_STATE_BODY)
+            more = parse_body(conn, decoder);
+
+        // Can reach DONE from any call above.
+        if (decoder->state == HTTP1_MSG_STATE_DONE)
+            more = parse_done(conn, decoder);
+    }
+
+    return decoder->error;
+}
+
+
+void *http1_connection_get_context(http1_conn_t *conn)
+{
+    return conn->context;
+}
+
+// Push inbound network data into the http1 protocol engine.
+//
+// All of the xfer_rx callback will occur in the context of this call. This
+// returns zero on success otherwise an error code.  Any error occuring during
+// a callback will be reflected in the return value of this function.  It is
+// expected that the caller will call http1_connection_close on a non-zero
+// return value.
+//
+int http1_connection_rx_data(http1_conn_t *conn, qd_buffer_list_t *data, size_t len)
+{
+    struct decoder_t *decoder = &conn->decoder;
+    bool init_ptrs = DEQ_IS_EMPTY(decoder->incoming);
+
+    DEQ_APPEND(decoder->incoming, *data);
+
+    if (init_ptrs) {
+        decoder->read_ptr.buffer = DEQ_HEAD(decoder->incoming);
+        decoder->read_ptr.cursor = qd_buffer_base(decoder->read_ptr.buffer);
+        decoder->read_ptr.remaining = len;
+
+        if (decoder->state == HTTP1_MSG_STATE_BODY) {
+            decoder->body_ptr = decoder->read_ptr;
+            decoder->body_ptr.remaining = 0;
+        }
+    } else {
+        decoder->read_ptr.remaining += len;
+    }
+
+    return decode_incoming(conn);
+}
+
+void http1_transfer_set_context(http1_transfer_t *xfer, void *context)
+{
+    xfer->context = context;
+}
+
+void *http1_transfer_get_context(const http1_transfer_t *xfer)
+{
+    return xfer->context;
+}
+
+http1_conn_t *http1_transfer_get_connection(const http1_transfer_t *xfer)
+{
+    return xfer->conn;
+}
+
+
+// initiate a new HTTP request.  This creates a new transfer.
+// request = <method>SP<target>SP<version>CRLF
+// Expects version to be in the format "HTTP/X.Y"
+//
+http1_transfer_t *http1_tx_request(http1_conn_t *conn, const char *method, const char *target, const char *version)
+{
+    struct encoder_t *encoder = &conn->encoder;
+    assert(!encoder->xfer);   // error: transfer already in progress
+    assert(conn->config.type == HTTP1_CONN_SERVER);
+
+    http1_transfer_t *xfer = encoder->xfer = http1_transfer(conn);
+    encoder->is_request = true;
+    encoder->crlf_sent = false;
+
+    if (strcmp((char*)method, "HEAD") == 0)
+        xfer->is_head_method = true;
+    else if (strcmp((char*)method, "CONNECT") == 0)
+        xfer->is_connect_method = true;
+
+    write_string(encoder, method);
+    write_string(encoder, " ");
+    write_string(encoder, target);
+    write_string(encoder, " ");
+    write_string(encoder, version);
+    write_string(encoder, CRLF);
+
+    return xfer;
+}
+
+
+// Send an HTTP response msg.  xfer must correspond to the "oldest" outstanding
+// request that arrived via the xfer_rx_request callback for this connection.
+// version is expected to be in the form "HTTP/x.y"
+// status_code is expected to be 100 <= status_code <= 999
+// status-line = HTTP-version SP status-code SP reason-phrase CRLF
+//
+int http1_tx_response(http1_transfer_t *xfer, const char *version, int status_code, const char *reason_phrase)
+{
+    http1_conn_t *conn = http1_transfer_get_connection(xfer);
+    struct encoder_t *encoder = &conn->encoder;
+
+    assert(conn->config.type == HTTP1_CONN_CLIENT);
+    assert(!encoder->xfer);   // error: transfer already in progress
+    assert(DEQ_HEAD(conn->xfers) == xfer);   // error: response not in order!
+    assert(xfer->response_code == 0);
+
+    encoder->xfer = xfer;
+    encoder->is_request = false;
+    encoder->crlf_sent = false;
+    xfer->response_code = status_code;
+
+    char code_str[32];
+    snprintf(code_str, 32, "%d", status_code);
+
+    write_string(encoder, version);
+    write_string(encoder, " ");
+    write_string(encoder, code_str);
+    if (reason_phrase) {
+        write_string(encoder, " ");
+        write_string(encoder, reason_phrase);
+    }
+    write_string(encoder, CRLF);
+
+    return 0;
+}
+
+
+// Add a header field to an outgoing message
+// header-field   = field-name ":" OWS field-value OWS
+int http1_tx_add_header(http1_transfer_t *xfer, const char *key, const char *value)
+{
+    http1_conn_t *conn = http1_transfer_get_connection(xfer);
+    struct encoder_t *encoder = &conn->encoder;
+    assert(encoder->xfer == xfer);  // xfer not current transfer
+
+    write_string(encoder, key);
+    write_string(encoder, ": ");
+    write_string(encoder, value);
+    write_string(encoder, CRLF);
+
+    // check to see if there are any full buffers that can be sent.
+
+    qd_buffer_list_t blist = DEQ_EMPTY;
+    qd_buffer_t *buf = DEQ_HEAD(encoder->outgoing);
+    size_t octets = 0;
+    while (buf && buf != encoder->write_ptr.buffer) {
+        DEQ_REMOVE_HEAD(encoder->outgoing);
+        DEQ_INSERT_TAIL(blist, buf);
+        octets += qd_buffer_size(buf);
+    }
+    if (!DEQ_IS_EMPTY(blist))
+        conn->config.conn_tx_data(conn, &blist, 0, octets);
+
+    return 0;
+}
+
+
+// just forward the body chain along
+int http1_tx_body(http1_transfer_t *xfer, qd_buffer_list_t *data, size_t offset, size_t len)
+{
+    http1_conn_t *conn = http1_transfer_get_connection(xfer);
+    struct encoder_t *encoder = &conn->encoder;
+
+    fprintf(stderr, "http1_tx_body(offset=%zu size=%zu)\n", offset, len);
+    
+    if (!encoder->crlf_sent) {
+        // need to terminate any headers by sending the plain CRLF that follows
+        // the headers
+        write_string(encoder, CRLF);
+
+        // flush all pending output.  From this point out the outgoing queue is
+        // no longer used for this message
+        fprintf(stderr, "Flushing before body: %u bytes\n", qd_buffer_list_length(&encoder->outgoing));
+        conn->config.conn_tx_data(conn, &encoder->outgoing, 0, qd_buffer_list_length(&encoder->outgoing));
+        DEQ_INIT(encoder->outgoing);
+        encoder->write_ptr = NULL_I_PTR;
+        encoder->crlf_sent = true;
+    }
+
+    // skip the outgoing queue and send directly
+    fprintf(stderr, "Sending body data %zu bytes\n", len);
+    conn->config.conn_tx_data(conn, data, offset, len);
+
+    return 0;
+}
+
+
+int http1_tx_done(http1_transfer_t *xfer)
+{
+    http1_conn_t *conn = http1_transfer_get_connection(xfer);
+    struct encoder_t *encoder = &conn->encoder;
+
+    if (!encoder->crlf_sent) {
+        // need to send the plain CRLF that follows the headers
+        write_string(encoder, CRLF);
+
+        // flush all pending output.
+
+        fprintf(stderr, "Flushing at tx_done: %u bytes\n", qd_buffer_list_length(&encoder->outgoing));
+        conn->config.conn_tx_data(conn, &encoder->outgoing, 0, qd_buffer_list_length(&encoder->outgoing));
+        DEQ_INIT(encoder->outgoing);
+        encoder->write_ptr = NULL_I_PTR;
+        encoder->crlf_sent = true;
+    }
+
+    bool is_response = !encoder->is_request;
+    encoder_reset(encoder);
+
+    if (is_response) {
+        if (IS_INFO_RESPONSE(xfer->response_code)) {
+            // this is a non-terminal response. Another response is expected
+            // for this request so just reset the transfer state
+            xfer->response_code = 0;
+        } else {
+            // The message exchange is complete
+            conn->config.xfer_done(xfer);
+            http1_transfer_free(xfer);
+        }
+    }
+
+    return 0;
+}
+
+
diff --git a/src/adaptors/http_adaptor.c b/src/adaptors/http_adaptor.c
index 86708c8..0b70459 100644
--- a/src/adaptors/http_adaptor.c
+++ b/src/adaptors/http_adaptor.c
@@ -30,6 +30,7 @@
 
 #include "qpid/dispatch/protocol_adaptor.h"
 #include "delivery.h"
+#include "http_common.h"
 #include "http_adaptor.h"
 
 const char *PATH = ":path";
@@ -49,8 +50,6 @@ const char *CONTENT_ENCODING = "content-encoding";
         NGHTTP2_NV_FLAG_NONE                                                   \
 }
 
-ALLOC_DEFINE(qd_http_lsnr_t);
-ALLOC_DEFINE(qd_http_connector_t);
 ALLOC_DEFINE(qdr_http2_session_data_t);
 ALLOC_DEFINE(qdr_http2_stream_data_t);
 
@@ -554,26 +553,6 @@ static void grant_read_buffers(qdr_http_connection_t *conn)
 }
 
 
-static void free_bridge_config(qd_bridge_config_t *config)
-{
-    if (!config) {
-        return;
-    }
-    free(config->host);
-    free(config->port);
-    free(config->name);
-    free(config->address);
-    free(config->host_port);
-}
-
-void qd_http_listener_decref(qd_http_lsnr_t* li)
-{
-    if (li && sys_atomic_dec(&li->ref_count) == 1) {
-        free_bridge_config(&li->config);
-        free_qd_http_lsnr_t(li);
-    }
-}
-
 static void qdr_http_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
 {
 }
@@ -892,18 +871,8 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
     return 0;
 }
 
-void qd_http_connector_decref(qd_http_connector_t* c)
+void qd_http2_delete_connector(qd_dispatch_t *qd, qd_http_connector_t *connector)
 {
-    if (c && sys_atomic_dec(&c->ref_count) == 1) {
-        free_bridge_config(&c->config);
-        free_qd_http_connector_t(c);
-    }
-}
-
-
-void qd_dispatch_delete_http_connector(qd_dispatch_t *qd, void *impl)
-{
-    qd_http_connector_t *connector = (qd_http_connector_t*) impl;
     if (connector) {
         //TODO: cleanup and close any associated active connections
         DEQ_REMOVE(http_adaptor->connectors, connector);
@@ -1102,69 +1071,25 @@ static void handle_listener_event(pn_event_t *e, qd_server_t *qd_server, void *c
 }
 
 
-static qd_http_lsnr_t *qd_http_lsnr(qd_server_t *server)
-{
-    qd_http_lsnr_t *li = new_qd_http_lsnr_t();
-    if (!li)
-        return 0;
-    ZERO(li);
-    sys_atomic_init(&li->ref_count, 1);
-    li->server = server;
-    li->context.context = li;
-    li->context.handler = &handle_listener_event;
-    return li;
-}
-
-
-#define CHECK() if (qd_error_code()) goto error
-
-
 static const int BACKLOG = 50;  /* Listening backlog */
 
 static bool http_listener_listen(qd_http_lsnr_t *li) {
-   li->pn_listener = pn_listener();
-    if (li->pn_listener) {
-        pn_listener_set_context(li->pn_listener, &li->context);
-        pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, li->config.host_port, BACKLOG);
-        sys_atomic_inc(&li->ref_count); /* In use by proactor, PN_LISTENER_CLOSE will dec */
-        /* Listen is asynchronous, log "listening" message on PN_LISTENER_OPEN event */
-    } else {
-        qd_log(http_adaptor->log_source, QD_LOG_CRITICAL, "Failed to create listener for %s",
-               li->config.host_port);
-     }
+    pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, li->config.host_port, BACKLOG);
+    sys_atomic_inc(&li->ref_count); /* In use by proactor, PN_LISTENER_CLOSE will dec */
+    /* Listen is asynchronous, log "listening" message on PN_LISTENER_OPEN event */
     return li->pn_listener;
 }
 
-static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_bridge_config_t *config, qd_entity_t* entity)
-{
-    qd_error_clear();
-    ZERO(config);
-
-    config->name                 = qd_entity_get_string(entity, "name");              CHECK();
-    config->host                 = qd_entity_get_string(entity, "host");              CHECK();
-    config->port                 = qd_entity_get_string(entity, "port");              CHECK();
-    config->address              = qd_entity_get_string(entity, "address");           CHECK();
-
-    int hplen = strlen(config->host) + strlen(config->port) + 2;
-    config->host_port = malloc(hplen);
-    snprintf(config->host_port, hplen, "%s:%s", config->host, config->port);
-
-    return QD_ERROR_NONE;
-
- error:
-    free_bridge_config(config);
-    return qd_error_code();
-}
-
 
-qd_http_lsnr_t *qd_dispatch_configure_http_lsnr(qd_dispatch_t *qd, qd_entity_t *entity)
+qd_http_lsnr_t *qd_http2_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
 {
-    qd_http_lsnr_t *li = qd_http_lsnr(qd->server);
-    if (!li || load_bridge_config(qd, &li->config, entity) != QD_ERROR_NONE) {
-        qd_log(http_adaptor->log_source, QD_LOG_ERROR, "Unable to create http listener: %s", qd_error_message());
-        qd_http_listener_decref(li);
+    qd_http_lsnr_t *li = qd_http_lsnr(qd->server, &handle_listener_event);
+    if (!li) {
+        qd_log(http_adaptor->log_source, QD_LOG_ERROR, "Unable to create http listener: no memory");
         return 0;
     }
+
+    li->config = *config;
     //DEQ_ITEM_INIT(li);
     DEQ_INSERT_TAIL(http_adaptor->listeners, li);
     qd_log(http_adaptor->log_source, QD_LOG_INFO, "Configured HTTP_ADAPTOR listener on %s", (&li->config)->host_port);
@@ -1173,16 +1098,12 @@ qd_http_lsnr_t *qd_dispatch_configure_http_lsnr(qd_dispatch_t *qd, qd_entity_t *
 }
 
 
-static qd_http_connector_t *qd_http_connector(qd_server_t *server)
+void qd_http2_delete_listener(qd_dispatch_t *qd, qd_http_lsnr_t *listener)
 {
-    qd_http_connector_t *c = new_qd_http_connector_t();
-    if (!c) return 0;
-    ZERO(c);
-    sys_atomic_init(&c->ref_count, 1);
-    c->server      = server;
-    return c;
+    // TBD?
 }
 
+
 static void on_activate(void *context)
 {
     qdr_http_connection_t* conn = (qdr_http_connection_t*) context;
@@ -1278,20 +1199,18 @@ qdr_http_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector
 }
 
 
-
-qd_http_connector_t *qd_dispatch_configure_http_connector(qd_dispatch_t *qd, qd_entity_t *entity)
+qd_http_connector_t *qd_http2_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
 {
     qd_http_connector_t *c = qd_http_connector(qd->server);
-    if (!c || load_bridge_config(qd, &c->config, entity) != QD_ERROR_NONE) {
-        qd_log(http_adaptor->log_source, QD_LOG_ERROR, "Unable to create tcp connector: %s", qd_error_message());
-        qd_http_connector_decref(c);
+    if (!c) {
+        qd_log(http_adaptor->log_source, QD_LOG_ERROR, "Unable to create http connector: no memory");
         return 0;
     }
+    c->config = *config;
     DEQ_ITEM_INIT(c);
     DEQ_INSERT_TAIL(http_adaptor->connectors, c);
     qdr_http_connection_egress(c);
     return c;
-
 }
 
 static void qdr_http_adaptor_final(void *adaptor_context)
@@ -1302,17 +1221,6 @@ static void qdr_http_adaptor_final(void *adaptor_context)
     http_adaptor =  NULL;
 }
 
-qd_error_t qd_entity_refresh_httpListener(qd_entity_t* entity, void *impl)
-{
-    return QD_ERROR_NONE;
-}
-
-
-qd_error_t qd_entity_refresh_httpConnector(qd_entity_t* entity, void *impl)
-{
-    return QD_ERROR_NONE;
-}
-
 /**
  * This initialization function will be invoked when the router core is ready for the protocol
  * adaptor to be created.  This function must:
@@ -1341,7 +1249,7 @@ static void qdr_http_adaptor_init(qdr_core_t *core, void **adaptor_context)
                                             qdr_http_delivery_update,
                                             qdr_http_conn_close,
                                             qdr_http_conn_trace);
-    adaptor->log_source  = qd_log_source("HTTP_ADAPTOR");
+    adaptor->log_source = qd_log_source(QD_HTTP_LOG_SOURCE);
     adaptor->protocol_log_source = qd_log_source("PROTOCOL");
     *adaptor_context = adaptor;
     DEQ_INIT(adaptor->listeners);
diff --git a/src/adaptors/http_adaptor.h b/src/adaptors/http_adaptor.h
index 5982622..2f0134a 100644
--- a/src/adaptors/http_adaptor.h
+++ b/src/adaptors/http_adaptor.h
@@ -25,44 +25,16 @@
 #include <qpid/dispatch/log.h>
 #include <nghttp2/nghttp2.h>
 
+
 // We already have a qd_http_listener_t defined in http-libwebsockets.c
 // We will call this as qd_http_lsnr_t in order to avoid a clash.
 // At a later point in time, we will handle websocket here as well
 // and get rid of http-libwebsockets.c and rename this as qd_http_listener_t
-typedef struct qd_http_lsnr_t           qd_http_lsnr_t;
-typedef struct qd_http_connector_t      qd_http_connector_t;
 typedef struct qdr_http2_session_data_t qdr_http2_session_data_t;
-typedef struct qd_bridge_config_t       qd_bridge_config_t;
 typedef struct qdr_http2_stream_data_t  qdr_http2_stream_data_t;
 typedef struct qdr_http_connection_t    qdr_http_connection_t;
 DEQ_DECLARE(qdr_http2_stream_data_t, qd_http2_stream_data_list_t);
 
-struct qd_bridge_config_t {
-    char *name;
-    char *host;
-    char *port;
-    char *address;
-    char *host_port;
-};
-
-struct qd_http_lsnr_t {
-    qd_handler_context_t       context;
-    sys_atomic_t               ref_count;
-    qd_server_t               *server;
-    qd_bridge_config_t         config;
-    pn_listener_t             *pn_listener;
-    DEQ_LINKS(qd_http_lsnr_t);
-};
-
-struct qd_http_connector_t {
-    sys_atomic_t              ref_count;
-    qd_server_t              *server;
-    qd_bridge_config_t        config;
-    qd_timer_t               *timer;
-    long                      delay;
-    DEQ_LINKS(qd_http_connector_t);
-};
-
 struct qdr_http2_session_data_t {
     qd_http2_stream_data_list_t  streams;    // A session can have many streams.
     nghttp2_session             *session;    // A pointer to the nghttp2s' session object
@@ -110,7 +82,7 @@ struct qdr_http_connection_t {
     pn_raw_buffer_t          read_buffers[4];
     bool                     ingress;
     qd_timer_t              *activate_timer;
-    qd_bridge_config_t      *config;
+    qd_http_bridge_config_t *config;
     qd_server_t             *server;
     uint64_t                 conn_id;
 
@@ -126,11 +98,7 @@ struct qdr_http_connection_t {
     nghttp2_data_provider    data_prd;
 };
 
-DEQ_DECLARE(qd_http_lsnr_t, qd_http_lsnr_list_t);
-ALLOC_DECLARE(qd_http_lsnr_t);
 ALLOC_DECLARE(qdr_http2_session_data_t);
-ALLOC_DECLARE(qd_http_connector_t);
 ALLOC_DECLARE(qdr_http2_stream_data_t);
-DEQ_DECLARE(qd_http_connector_t, qd_http_connector_list_t);
 
 
diff --git a/src/adaptors/http_common.c b/src/adaptors/http_common.c
new file mode 100644
index 0000000..b3a6fc9
--- /dev/null
+++ b/src/adaptors/http_common.c
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "http_common.h"
+
+#include <proton/listener.h>
+
+#include <stdio.h>
+
+ALLOC_DECLARE(qd_http_lsnr_t);
+ALLOC_DEFINE(qd_http_lsnr_t);
+ALLOC_DECLARE(qd_http_connector_t);
+ALLOC_DEFINE(qd_http_connector_t);
+
+
+static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_http_bridge_config_t *config, qd_entity_t* entity)
+{
+    char *version_str = 0;
+
+    qd_error_clear();
+    ZERO(config);
+
+#define CHECK() if (qd_error_code()) goto error
+    config->name    = qd_entity_get_string(entity, "name");            CHECK();
+    config->host    = qd_entity_get_string(entity, "host");            CHECK();
+    config->port    = qd_entity_get_string(entity, "port");            CHECK();
+    config->address = qd_entity_get_string(entity, "address");         CHECK();
+    version_str     = qd_entity_get_string(entity, "protcolVersion");  CHECK();
+
+    if (strncmp(version_str, "HTTP/1", 6) == 0) {
+        config->version = VERSION_HTTP1;
+    } else {
+        config->version = VERSION_HTTP2;
+    }
+    free(version_str);
+    version_str = 0;
+
+    int hplen = strlen(config->host) + strlen(config->port) + 2;
+    config->host_port = malloc(hplen);
+    snprintf(config->host_port, hplen, "%s:%s", config->host, config->port);
+
+    return QD_ERROR_NONE;
+
+error:
+    qd_http_free_bridge_config(config);
+    free(version_str);
+    return qd_error_code();
+}
+
+
+void qd_http_free_bridge_config(qd_http_bridge_config_t *config)
+{
+    if (!config) {
+        return;
+    }
+    free(config->host);
+    free(config->port);
+    free(config->name);
+    free(config->address);
+    free(config->host_port);
+}
+
+
+//
+// HTTP Listener Management (HttpListenerEntity)
+//
+
+
+qd_http_lsnr_t *qd_dispatch_configure_http_lsnr(qd_dispatch_t *qd, qd_entity_t *entity)
+{
+    qd_http_lsnr_t *listener = 0;
+    qd_http_bridge_config_t config;
+
+    if (load_bridge_config(qd, &config, entity) != QD_ERROR_NONE) {
+        qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_ERROR,
+               "Unable to create http listener: %s", qd_error_message());
+        return 0;
+    }
+
+    switch (config.version) {
+    case VERSION_HTTP1:
+        listener = qd_http1_configure_listener(qd, &config, entity);
+        break;
+    case VERSION_HTTP2:
+        listener = qd_http2_configure_listener(qd, &config, entity);
+        break;
+    }
+
+    if (!listener)
+        qd_http_free_bridge_config(&config);
+
+    return listener;
+}
+
+
+void qd_dispatch_delete_http_listener(qd_dispatch_t *qd, void *impl)
+{
+    qd_http_lsnr_t *listener = (qd_http_lsnr_t*) impl;
+    if (listener) {
+        switch (listener->config.version) {
+        case VERSION_HTTP1:
+            qd_http1_delete_listener(qd, listener);
+            break;
+        case VERSION_HTTP2:
+            qd_http2_delete_listener(qd, listener);
+            break;
+        }
+    }
+}
+
+
+qd_error_t qd_entity_refresh_httpListener(qd_entity_t* entity, void *impl)
+{
+    return QD_ERROR_NONE;
+}
+
+
+//
+// HTTP Connector Management (HttpConnectorEntity)
+//
+
+
+qd_http_connector_t *qd_dispatch_configure_http_connector(qd_dispatch_t *qd, qd_entity_t *entity)
+{
+    qd_http_connector_t *conn = 0;
+    qd_http_bridge_config_t config;
+
+    if (load_bridge_config(qd, &config, entity) != QD_ERROR_NONE) {
+        qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_ERROR,
+               "Unable to create http connector: %s", qd_error_message());
+        return 0;
+    }
+
+    switch (config.version) {
+    case VERSION_HTTP1:
+        conn = qd_http1_configure_connector(qd, &config, entity);
+        break;
+    case VERSION_HTTP2:
+        conn = qd_http2_configure_connector(qd, &config, entity);
+        break;
+    }
+
+    if (!conn)
+        qd_http_free_bridge_config(&config);
+
+    return conn;
+}
+
+
+void qd_dispatch_delete_http_connector(qd_dispatch_t *qd, void *impl)
+{
+    qd_http_connector_t *conn = (qd_http_connector_t*) impl;
+
+    if (conn) {
+        switch (conn->config.version) {
+        case VERSION_HTTP1:
+            qd_http1_delete_connector(qd, conn);
+            break;
+        case VERSION_HTTP2:
+            qd_http2_delete_connector(qd, conn);
+            break;
+        }
+    }
+}
+
+qd_error_t qd_entity_refresh_httpConnector(qd_entity_t* entity, void *impl)
+{
+    return QD_ERROR_NONE;
+}
+
+//
+// qd_http_lsnr_t constructor
+//
+
+qd_http_lsnr_t *qd_http_lsnr(qd_server_t *server, qd_server_event_handler_t handler)
+{
+    qd_http_lsnr_t *li = new_qd_http_lsnr_t();
+    if (!li)
+        return 0;
+    ZERO(li);
+
+    li->pn_listener = pn_listener();
+    if (!li->pn_listener) {
+        free_qd_http_lsnr_t(li);
+        return 0;
+    }
+
+    sys_atomic_init(&li->ref_count, 1);
+    li->server = server;
+    li->context.context = li;
+    li->context.handler = handler;
+    pn_listener_set_context(li->pn_listener, &li->context);
+
+    return li;
+}
+
+void qd_http_listener_decref(qd_http_lsnr_t* li)
+{
+    if (li && sys_atomic_dec(&li->ref_count) == 1) {
+        qd_http_free_bridge_config(&li->config);
+        free_qd_http_lsnr_t(li);
+    }
+}
+
+//
+// qd_http_connector_t constructor
+//
+
+qd_http_connector_t *qd_http_connector(qd_server_t *server)
+{
+    qd_http_connector_t *c = new_qd_http_connector_t();
+    if (!c) return 0;
+    ZERO(c);
+    sys_atomic_init(&c->ref_count, 1);
+    c->server      = server;
+    return c;
+}
+
+void qd_http_connector_decref(qd_http_connector_t* c)
+{
+    if (c && sys_atomic_dec(&c->ref_count) == 1) {
+        qd_http_free_bridge_config(&c->config);
+        free_qd_http_connector_t(c);
+    }
+}
+
+
diff --git a/src/adaptors/http_common.h b/src/adaptors/http_common.h
new file mode 100644
index 0000000..e8a7ff9
--- /dev/null
+++ b/src/adaptors/http_common.h
@@ -0,0 +1,108 @@
+#ifndef __http_common_h__
+#define __http_common_h__
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/atomic.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/timer.h>
+
+#include "delivery.h"
+#include "entity.h"
+
+#define QD_HTTP_LOG_SOURCE "HTTP_ADAPTOR"
+
+typedef enum {
+    VERSION_HTTP1,
+    VERSION_HTTP2,
+} qd_http_version_t;
+
+typedef struct qd_http_bridge_config_t {
+    char              *name;
+    char              *host;
+    char              *port;
+    char              *address;
+    char              *host_port;
+    qd_http_version_t  version;
+} qd_http_bridge_config_t;
+
+void qd_http_free_bridge_config(qd_http_bridge_config_t *config);
+
+typedef struct qd_http_lsnr_t qd_http_lsnr_t;
+struct qd_http_lsnr_t {
+    qd_http_bridge_config_t    config;
+    qd_handler_context_t       context;
+    sys_atomic_t               ref_count;
+    qd_server_t               *server;
+    pn_listener_t             *pn_listener;
+    DEQ_LINKS(qd_http_lsnr_t);
+};
+DEQ_DECLARE(qd_http_lsnr_t, qd_http_lsnr_list_t);
+
+qd_http_lsnr_t *qd_http_lsnr(qd_server_t *server, qd_server_event_handler_t handler);
+void qd_http_listener_decref(qd_http_lsnr_t* li);
+
+typedef struct qd_http_connector_t qd_http_connector_t;
+struct qd_http_connector_t {
+    qd_http_bridge_config_t       config;
+    sys_atomic_t                  ref_count;
+    qd_server_t                  *server;
+    qd_timer_t                   *timer;
+    long                          delay;
+    struct qdr_http_connection_t *dispatcher;  // pseudo egress connection
+    DEQ_LINKS(qd_http_connector_t);
+};
+DEQ_DECLARE(qd_http_connector_t, qd_http_connector_list_t);
+
+qd_http_connector_t *qd_http_connector(qd_server_t *server);
+void qd_http_connector_decref(qd_http_connector_t* c);
+
+
+
+//
+// Management Entity Interfaces (see HttpListenerEntity and HttpConnectorEntity in agent.py)
+//
+
+qd_http_lsnr_t *qd_dispatch_configure_http_lsnr(qd_dispatch_t *qd, qd_entity_t *entity);
+void qd_dispatch_delete_http_listener(qd_dispatch_t *qd, void *impl);
+qd_error_t qd_entity_refresh_httpListener(qd_entity_t* entity, void *impl);
+
+qd_http_connector_t *qd_dispatch_configure_http_connector(qd_dispatch_t *qd, qd_entity_t *entity);
+void qd_dispatch_delete_http_connector(qd_dispatch_t *qd, void *impl);
+qd_error_t qd_entity_refresh_httpConnector(qd_entity_t* entity, void *impl);
+
+//
+// These functions are defined in their respective HTTP adaptors:
+//
+
+qd_http_lsnr_t *qd_http1_configure_listener(qd_dispatch_t *, const qd_http_bridge_config_t *, qd_entity_t *);
+qd_http_lsnr_t *qd_http2_configure_listener(qd_dispatch_t *, const qd_http_bridge_config_t *, qd_entity_t *);
+
+void qd_http1_delete_listener(qd_dispatch_t *, qd_http_lsnr_t *);
+void qd_http2_delete_listener(qd_dispatch_t *, qd_http_lsnr_t *);
+
+qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *, const qd_http_bridge_config_t *, qd_entity_t *);
+qd_http_connector_t *qd_http2_configure_connector(qd_dispatch_t *, const qd_http_bridge_config_t *, qd_entity_t *);
+
+void qd_http1_delete_connector(qd_dispatch_t *, qd_http_connector_t *);
+void qd_http2_delete_connector(qd_dispatch_t *, qd_http_connector_t *);
+
+
+#endif // __http_common_h__
diff --git a/tests/system_tests_qdmanage.py b/tests/system_tests_qdmanage.py
index efd102f..8553e4f 100644
--- a/tests/system_tests_qdmanage.py
+++ b/tests/system_tests_qdmanage.py
@@ -39,7 +39,7 @@ DUMMY = "org.apache.qpid.dispatch.dummy"
 
 CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', u'int_property': 6451}
 
-TOTAL_ENTITIES=29   # for tests that check the total # of entities
+TOTAL_ENTITIES=31   # for tests that check the total # of entities
 
 
 class QdmanageTest(TestCase):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org