You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/08/10 18:28:03 UTC
[qpid-dispatch] branch dev-protocol-adaptors updated:
DISPATCH-1744: refactor common HTTP code
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
new c20b29a DISPATCH-1744: refactor common HTTP code
c20b29a is described below
commit c20b29a1f70a507fd04a26b2d1692aa522fdb9b1
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Thu Aug 6 10:22:35 2020 -0400
DISPATCH-1744: refactor common HTTP code
This closes #815
---
include/qpid/dispatch/http1_lib.h | 118 +++
python/qpid_dispatch/management/qdrouter.json | 20 +
src/CMakeLists.txt | 3 +
src/adaptors/http1/http1_adaptor.c | 56 +
src/adaptors/http1/http1_lib.c | 1377 +++++++++++++++++++++++++
src/adaptors/http_adaptor.c | 130 +--
src/adaptors/http_adaptor.h | 36 +-
src/adaptors/http_common.c | 243 +++++
src/adaptors/http_common.h | 108 ++
tests/system_tests_qdmanage.py | 2 +-
10 files changed, 1947 insertions(+), 146 deletions(-)
diff --git a/include/qpid/dispatch/http1_lib.h b/include/qpid/dispatch/http1_lib.h
new file mode 100644
index 0000000..58385e0
--- /dev/null
+++ b/include/qpid/dispatch/http1_lib.h
@@ -0,0 +1,118 @@
+#ifndef http1_lib_H
+#define http1_lib_H 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/dispatch/buffer.h>
+
+#include <inttypes.h>
+
+#define HTTP1_VERSION_1_1 "HTTP/1.1"
+#define HTTP1_VERSION_1_0 "HTTP/1.0"
+
+typedef struct http1_conn_t http1_conn_t;
+typedef struct http1_transfer_t http1_transfer_t;
+
+
+typedef enum {
+ HTTP1_CONN_CLIENT, // connection initiated by client
+ HTTP1_CONN_SERVER, // connection to server
+} http1_conn_type_t;
+
+typedef enum {
+ HTTP1_STATUS_BAD_REQ = 400,
+ HTTP1_STATUS_SERVER_ERR = 500,
+ HTTP1_STATUS_BAD_VERSION = 505,
+} http1_status_code_t;
+
+typedef struct http1_conn_config_t {
+
+ http1_conn_type_t type;
+
+ // called with output data to write to the network
+ void (*conn_tx_data)(http1_conn_t *conn, qd_buffer_list_t *data, size_t offset, unsigned int len);
+
+ // @TODO(kgiusti) - remove?
+ //void (*conn_error)(http1_conn_t *conn, int code, const char *reason);
+
+ //
+ // RX message callbacks
+ //
+
+ // HTTP request received - new transfer created (xfer). This xfer must be
+ // supplied in the http1_response() method
+ int (*xfer_rx_request)(http1_transfer_t *xfer,
+ const char *method,
+ const char *target,
+ const char *version);
+
+ // HTTP response received - the transfer comes from the return value of the
+ // corresponding http1_request method. Note well that if status_code is
+ // Informational (1xx) then this response is NOT the last response for the
+ // current request (See RFC7231, 6.2 Informational 1xx). The xfer_done
+ // callback will be called after the LAST response has been received.
+ //
+ int (*xfer_rx_response)(http1_transfer_t *xfer,
+ const char *version,
+ int status_code,
+ const char *reason_phrase);
+
+ int (*xfer_rx_header)(http1_transfer_t *xfer, const char *key, const char *value);
+ int (*xfer_rx_headers_done)(http1_transfer_t *xfer);
+
+ int (*xfer_rx_body)(http1_transfer_t *xfer, qd_buffer_list_t *body, size_t offset, size_t len);
+
+ void (*xfer_rx_done)(http1_transfer_t *xfer);
+
+ // Invoked when the request/response(s) exchange has completed
+ //
+ void (*xfer_done)(http1_transfer_t *xfer);
+} http1_conn_config_t;
+
+
+http1_conn_t *http1_connection(http1_conn_config_t *config, void *context);
+void http1_connection_close(http1_conn_t *conn);
+void *http1_connection_get_context(http1_conn_t *conn);
+
+// push inbound network data into the http1 library
+int http1_connection_rx_data(http1_conn_t *conn, qd_buffer_list_t *data, size_t len);
+
+
+//
+// API for sending HTTP/1.1 messages
+//
+void http1_transfer_set_context(http1_transfer_t *xfer, void *context);
+void *http1_transfer_get_context(const http1_transfer_t *xfer);
+http1_conn_t *http1_transfer_get_connection(const http1_transfer_t *xfer);
+
+// initiate a request - this creates a new message transfer context
+http1_transfer_t *http1_tx_request(http1_conn_t *conn, const char *method, const char *target, const char *version);
+
+// respond to a received request - the transfer context should be from the corresponding xfer_rx_request callback
+int http1_tx_response(http1_transfer_t *xfer, const char *version, int status_code, const char *reason_phrase);
+
+int http1_tx_add_header(http1_transfer_t *xfer, const char *key, const char *value);
+int http1_tx_body(http1_transfer_t *xfer, qd_buffer_list_t *data, size_t offset, size_t len);
+int http1_tx_done(http1_transfer_t *xfer);
+
+
+
+
+
+#endif // http1_lib_H
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index dc1dcb7..421fa0a 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1103,6 +1103,16 @@
"required": false,
"description": "Name of the sslProfile..",
"create": true
+ },
+ "protocolVersion": {
+ "description": "The version of the HTTP protocol supported by this listener.",
+ "type": [
+ "HTTP/1.x",
+ "HTTP/2.0"
+ ],
+ "default": "HTTP/1.x",
+ "required": false,
+ "create": true
}
}
},
@@ -1127,6 +1137,16 @@
"type": "string",
"create": true
+ },
+ "protocolVersion": {
+ "description": "The version of the HTTP protocol supported by this connector.",
+ "type": [
+ "HTTP/1.x",
+ "HTTP/2.0"
+ ],
+ "default": "HTTP/1.x",
+ "required": false,
+ "create": true
}
}
},
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 753436b..f7ac67e 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -39,7 +39,10 @@ add_custom_command (
# Build the qpid-dispatch library.
set(qpid_dispatch_SOURCES
adaptors/reference_adaptor.c
+ adaptors/http_common.c
adaptors/http_adaptor.c
+ adaptors/http1/http1_lib.c
+ adaptors/http1/http1_adaptor.c
alloc_pool.c
amqp.c
bitmask.c
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
new file mode 100644
index 0000000..beb9eb4
--- /dev/null
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/http1_lib.h>
+#include <qpid/dispatch/protocol_adaptor.h>
+#include <qpid/dispatch/message.h>
+#include "adaptors/http_common.h"
+
+#include <stdio.h>
+#include <inttypes.h>
+
+
+typedef struct qd_http1_adaptor_t {
+ qdr_core_t *core;
+ qdr_protocol_adaptor_t *adaptor;
+ qd_http_lsnr_list_t listeners;
+ qd_http_connector_list_t connectors;
+ qd_log_source_t *log;
+} qd_http1_adaptor_t;
+
+//static qd_http1_adaptor_t *http1_adaptor;
+
+#define BUFF_BATCH 16
+
+
+// dummy for now:
+qd_http_lsnr_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
+{
+ return 0;
+}
+
+void qd_http1_delete_listener(qd_dispatch_t *qd, qd_http_lsnr_t *listener) {}
+
+qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
+{
+ return 0;
+}
+
+void qd_http1_delete_connector(qd_dispatch_t *qd, qd_http_connector_t *conn) {}
+
diff --git a/src/adaptors/http1/http1_lib.c b/src/adaptors/http1/http1_lib.c
new file mode 100644
index 0000000..991d492
--- /dev/null
+++ b/src/adaptors/http1/http1_lib.c
@@ -0,0 +1,1377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/dispatch/http1_lib.h>
+
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/buffer.h>
+#include <qpid/dispatch/alloc_pool.h>
+
+#include <ctype.h>
+#include <stdio.h>
+#include <string.h>
+
+
+const uint8_t CR_TOKEN = '\r';
+const uint8_t LF_TOKEN = '\n';
+const char *CRLF = "\r\n";
+
+const qd_iterator_pointer_t NULL_I_PTR = {0};
+
+// true for informational response codes
+#define IS_INFO_RESPONSE(code) ((code) / 100 == 1)
+
+typedef enum {
+ HTTP1_MSG_STATE_START = 0, // parsing start-line
+ HTTP1_MSG_STATE_HEADERS, // parsing headers
+ HTTP1_MSG_STATE_BODY, // parsing body
+ HTTP1_MSG_STATE_DONE, // parsing complete
+} http1_msg_state_t;
+
+
+typedef enum {
+ HTTP1_CHUNK_HEADER = 0, // waiting for chunk header
+ HTTP1_CHUNK_DATA, // reading chunk data
+ HTTP1_CHUNK_TRAILERS, // reading until lone CRLF
+} http1_chunk_state_t;
+
+
+typedef struct scratch_buffer_t {
+ uint8_t *buf;
+ size_t size; // of buffer, not contents!
+} scratch_buffer_t;
+
+
+// state for a single request-response transaction
+//
+struct http1_transfer_t {
+ DEQ_LINKS(struct http1_transfer_t);
+ void *context;
+ http1_conn_t *conn;
+ uint32_t response_code;
+
+ bool close_on_done; // true if connection must be closed when transfer completes
+ bool is_head_method; // true if request method is HEAD
+ bool is_connect_method; // true if request method is CONNECT TODO(kgiusti): supported?
+};
+DEQ_DECLARE(http1_transfer_t, http1_transfer_list_t);
+ALLOC_DECLARE(http1_transfer_t);
+ALLOC_DEFINE(http1_transfer_t);
+
+
+// The HTTP/1.1 connection
+//
+struct http1_conn_t {
+ void *context;
+
+ // http requests are added to tail,
+ // in-progress response is at head
+ http1_transfer_list_t xfers;
+
+ // Decoder for current incoming msg.
+ //
+ // incoming: holds the raw data received by the proactor from this
+ // connection.
+ //
+ // read_ptr: points to the next octet to be decoded on the incoming buffer
+ // list. Remaining is the length of the raw data to be decoded.
+ //
+ // body_ptr: points to the first unconsumed octet of the message
+ // body. Remaining is the number of octets that may be consumed.
+ // Invariant: body_ptr.buffer always points to the incoming.head as body
+ // data is being parsed.
+ //
+ struct decoder_t {
+ qd_buffer_list_t incoming;
+ qd_iterator_pointer_t read_ptr;
+ qd_iterator_pointer_t body_ptr;
+
+ http1_transfer_t *xfer; // current transfer
+ http1_msg_state_t state;
+ scratch_buffer_t scratch;
+ int error;
+ const char *error_msg;
+
+ uint64_t content_length;
+ http1_chunk_state_t chunk_state;
+ uint64_t chunk_length;
+
+ bool is_request;
+ bool is_chunked;
+ bool is_1_0;
+
+ // decoded headers
+ bool hdr_transfer_encoding;
+ bool hdr_content_length;
+ } decoder;
+
+ // Encoder for current outgoing msg.
+ // outgoing: holds the encoded data that needs to be sent to proactor for
+ // sending out this connection
+ // write_ptr: points to the first empty octet to be written to by the
+ // encoder. Remaining is the total unused space in the outgoing list
+ // (capacity)
+ // Note that the outgoing list and the write_ptr are only used for the
+ // start line and headers. Body content buffer chains are past directly to
+ // the connection without encoding.
+ //
+ struct encoder_t {
+ qd_buffer_list_t outgoing;
+ qd_iterator_pointer_t write_ptr;
+ http1_transfer_t *xfer; // current transfer
+
+ bool is_request;
+ bool crlf_sent; // true if the CRLF after headers has been sent
+ } encoder;
+
+ http1_conn_config_t config;
+};
+ALLOC_DECLARE(http1_conn_t);
+ALLOC_DEFINE(http1_conn_t);
+
+static void decoder_reset(struct decoder_t *d);
+static void encoder_reset(struct encoder_t *e);
+
+
+// Create a new transfer - this is done when a new http request occurs
+// Keep oldest outstanding tranfer at DEQ_HEAD(conn->xfers)
+static http1_transfer_t *http1_transfer(http1_conn_t *conn)
+{
+ http1_transfer_t *xfer = new_http1_transfer_t();
+ ZERO(xfer);
+ xfer->conn = conn;
+ DEQ_INSERT_TAIL(conn->xfers, xfer);
+ return xfer;
+}
+
+
+static void http1_transfer_free(http1_transfer_t *xfer)
+{
+ if (xfer) {
+ http1_conn_t *conn = xfer->conn;
+ assert(conn->decoder.xfer != xfer);
+ assert(conn->encoder.xfer != xfer);
+ DEQ_REMOVE(conn->xfers, xfer);
+ free_http1_transfer_t(xfer);
+ }
+}
+
+
+http1_conn_t *http1_connection(http1_conn_config_t *config, void *context)
+{
+ http1_conn_t *conn = new_http1_conn_t();
+ ZERO(conn);
+
+ conn->context = context;
+ conn->config = *config;
+ DEQ_INIT(conn->xfers);
+
+ encoder_reset(&conn->encoder);
+ DEQ_INIT(conn->encoder.outgoing);
+ conn->encoder.write_ptr = NULL_I_PTR;
+
+ decoder_reset(&conn->decoder);
+ DEQ_INIT(conn->decoder.incoming);
+ conn->decoder.read_ptr = NULL_I_PTR;
+
+ return conn;
+}
+
+
+// Close the connection conn.
+//
+// This cancels all outstanding transfers and destroys the connection. If
+// there is an incoming response message body being parsed when this function
+// is invoked it will signal the end of the message.
+//
+void http1_connection_close(http1_conn_t *conn)
+{
+ if (conn) {
+ struct decoder_t *decoder = &conn->decoder;
+ if (!decoder->error) {
+ if (decoder->state == HTTP1_MSG_STATE_BODY
+ && decoder->xfer) {
+
+ // terminate the incoming message as the server has closed the
+ // connection to indicate the end of the message
+ conn->config.xfer_rx_done(decoder->xfer);
+ }
+
+ // notify any outstanding transfers
+ http1_transfer_t *xfer = DEQ_HEAD(conn->xfers);
+ while (xfer) {
+ conn->config.xfer_done(xfer);
+ xfer = DEQ_NEXT(xfer);
+ }
+ }
+
+ decoder_reset(&conn->decoder);
+ encoder_reset(&conn->encoder);
+ qd_buffer_list_free_buffers(&conn->decoder.incoming);
+ qd_buffer_list_free_buffers(&conn->encoder.outgoing);
+ free(conn->decoder.scratch.buf);
+
+ http1_transfer_t *xfer = DEQ_HEAD(conn->xfers);
+ while (xfer) {
+ http1_transfer_free(xfer); // removes from conn->xfers list
+ xfer = DEQ_HEAD(conn->xfers);
+ }
+
+ free_http1_conn_t(conn);
+ }
+}
+
+
+// reset the rx decoder state after message received
+//
+static void decoder_reset(struct decoder_t *decoder)
+{
+ // do not touch the read_ptr or incoming buffer list as they
+ // track the current position in the incoming data stream
+
+ decoder->body_ptr = NULL_I_PTR;
+ decoder->xfer = 0;
+ decoder->state = HTTP1_MSG_STATE_START;
+ decoder->content_length = 0;
+ decoder->chunk_state = HTTP1_CHUNK_HEADER;
+ decoder->chunk_length = 0;
+ decoder->error = 0;
+ decoder->error_msg = 0;
+
+ decoder->is_request = false;
+ decoder->is_chunked = false;
+ decoder->is_1_0 = false;
+
+ decoder->hdr_transfer_encoding = false;
+ decoder->hdr_content_length = false;
+}
+
+// reset the tx encoder after message sent
+static void encoder_reset(struct encoder_t *encoder)
+{
+ // do not touch the write_ptr or the outgoing queue as there may be more messages to send.
+ encoder->xfer = 0;
+ encoder->is_request = false;
+ encoder->crlf_sent = false;
+}
+
+
+// ensure the encoder has at least capacity octets available
+//
+static void ensure_outgoing_capacity(struct encoder_t *encoder, size_t capacity)
+{
+ while (encoder->write_ptr.remaining < capacity) {
+ qd_buffer_t *buf = qd_buffer();
+ DEQ_INSERT_TAIL(encoder->outgoing, buf);
+ encoder->write_ptr.remaining += qd_buffer_capacity(buf);
+ }
+ if (!encoder->write_ptr.buffer) {
+ encoder->write_ptr.buffer = DEQ_HEAD(encoder->outgoing);
+ encoder->write_ptr.cursor = qd_buffer_cursor(encoder->write_ptr.buffer);
+ }
+}
+
+
+// Write a C string to the encoder.
+//
+static void write_string(struct encoder_t *encoder, const char *string)
+{
+ size_t needed = strlen(string);
+ ensure_outgoing_capacity(encoder, needed);
+
+ qd_iterator_pointer_t *wptr = &encoder->write_ptr;
+ while (needed) {
+ if (qd_buffer_capacity(wptr->buffer) == 0) {
+ wptr->buffer = DEQ_NEXT(wptr->buffer);
+ wptr->cursor = qd_buffer_base(wptr->buffer);
+ }
+
+ size_t avail = MIN(needed, qd_buffer_capacity(wptr->buffer));
+ memcpy(wptr->cursor, string, avail);
+ qd_buffer_insert(wptr->buffer, avail);
+ wptr->cursor += avail;
+ wptr->remaining -= avail;
+ string += avail;
+ needed -= avail;
+ }
+}
+
+
+//
+static inline size_t skip_octets(qd_iterator_pointer_t *data, size_t amount)
+{
+ size_t count = 0;
+ amount = MIN(data->remaining, amount);
+ while (count < amount) {
+ if (data->cursor == qd_buffer_cursor(data->buffer)) {
+ data->buffer = DEQ_NEXT(data->buffer);
+ assert(data->buffer); // else data->remaining is bad
+ data->cursor = qd_buffer_base(data->buffer);
+ }
+ size_t available = qd_buffer_cursor(data->buffer) - data->cursor;
+ available = MIN(available, amount - count);
+ data->cursor += available;
+ count += available;
+ }
+ data->remaining -= amount;
+ return amount;
+}
+
+// consume next octet and advance the pointer
+static inline bool get_octet(qd_iterator_pointer_t *data, uint8_t *octet)
+{
+ if (data->remaining > 0) {
+ if (data->cursor == qd_buffer_cursor(data->buffer)) {
+ data->buffer = DEQ_NEXT(data->buffer);
+ data->cursor = qd_buffer_base(data->buffer);
+ }
+ *octet = *data->cursor;
+ data->cursor += 1;
+ data->remaining -= 1;
+ return true;
+ }
+ return false;
+}
+
+
+// True if line contains just "CRLF"
+//
+static bool is_empty_line(const qd_iterator_pointer_t *line)
+{
+ if (line->remaining == 2) {
+ qd_iterator_pointer_t tmp = *line;
+ uint8_t octet;
+ return (get_octet(&tmp, &octet) && octet == CR_TOKEN
+ && get_octet(&tmp, &octet) && octet == LF_TOKEN);
+ }
+ return false;
+}
+
+static void debug_print_iterator_pointer(const char *prefix, const qd_iterator_pointer_t *ptr)
+{
+ qd_iterator_pointer_t tmp = *ptr;
+ fprintf(stdout, "%s '", prefix);
+ size_t len = MIN(tmp.remaining, 80);
+ uint8_t octet;
+ while (len-- > 0 && get_octet(&tmp, &octet)) {
+ fputc(octet, stdout);
+ }
+ fprintf(stdout, "%s'\n", (tmp.remaining) ? " <truncated>" : "");
+ fflush(stdout);
+}
+
+
+// read a CRLF terminated line starting at 'data'.
+// On success, 'data' is advanced to the octet following the LF and 'line' is
+// set to the read line (including trailing CRLF). Returns false if no CRLF found
+//
+static bool read_line(qd_iterator_pointer_t *data, qd_iterator_pointer_t *line)
+{
+ qd_iterator_pointer_t tmp = *data;
+
+ *line = *data;
+ line->remaining = 0;
+
+ bool eol = false;
+
+ uint8_t octet;
+ while (!eol && get_octet(&tmp, &octet)) {
+ line->remaining += 1;
+ if (octet == CR_TOKEN) {
+ if (get_octet(&tmp, &octet)) {
+ line->remaining += 1;
+ if (octet == LF_TOKEN) {
+ eol = true;
+ }
+ }
+ }
+ }
+
+ if (eol) {
+ *data = tmp;
+ return true;
+ } else {
+ *line = NULL_I_PTR;
+ return false;
+ }
+}
+
+
+static bool ensure_scratch_size(scratch_buffer_t *b, size_t required)
+{
+ if (b->size < required) {
+ if (b->buf)
+ free(b->buf);
+ b->size = required;
+ b->buf = malloc(b->size);
+ }
+
+ // @TODO(kgiusti): deal with malloc failure
+ return true;
+}
+
+
+// trims any optional whitespace characters at the start of 'line'
+// RFC7230 defines OWS as zero or more spaces or horizontal tabs
+//
+static void trim_whitespace(qd_iterator_pointer_t *line)
+{
+ qd_iterator_pointer_t ptr = *line;
+ size_t skip = 0;
+ uint8_t octet;
+ while (get_octet(&ptr, &octet) && isblank(octet))
+ skip += 1;
+ if (skip)
+ skip_octets(line, skip);
+}
+
+// copy out iterator to a buffer and null terminate. Return # of bytes written
+// to str including terminating null.
+static size_t pointer_2_str(const qd_iterator_pointer_t *line, unsigned char *str, size_t len)
+{
+ assert(len);
+ qd_iterator_pointer_t tmp = *line;
+ uint8_t *ptr = (uint8_t *)str;
+ len -= 1; // reserve for null terminator
+ while (len-- > 0 && get_octet(&tmp, ptr))
+ ++ptr;
+ *ptr++ = 0;
+ return ptr - (uint8_t *)str;
+}
+
+
+// Parse out a token as defined by RFC7230 and store the result in 'token'.
+// 'line' is advanced past the token. This is used for parsing fields that
+// RFC7230 defines as 'tokens'.
+//
+static bool parse_token(qd_iterator_pointer_t *line, qd_iterator_pointer_t *token)
+{
+ static const char *TOKEN_EXTRA = "!#$%&’*+-.^_‘|~";
+
+ trim_whitespace(line);
+ qd_iterator_pointer_t tmp = *line;
+ *token = tmp;
+ size_t len = 0;
+ uint8_t octet;
+ while (get_octet(&tmp, &octet)
+ && (('A' <= octet && octet <= 'Z') ||
+ ('a' <= octet && octet <= 'z') ||
+ ('0' <= octet && octet <= '9') ||
+ (strchr(TOKEN_EXTRA, octet)))) {
+ len++;
+ }
+
+ if (len) {
+ token->remaining = len;
+ skip_octets(line, len);
+ return true;
+ }
+ *token = NULL_I_PTR;
+ return false;
+}
+
+
+// Parse out a text field delineated by whitespace.
+// 'line' is advanced past the field.
+//
+static bool parse_field(qd_iterator_pointer_t *line, qd_iterator_pointer_t *field)
+{
+ trim_whitespace(line);
+ qd_iterator_pointer_t tmp = *line;
+ *field = tmp;
+ size_t len = 0;
+ uint8_t octet;
+ while (get_octet(&tmp, &octet) && !isspace(octet))
+ len++;
+
+ if (len) {
+ field->remaining = len;
+ skip_octets(line, len);
+ return true;
+ }
+ *field = NULL_I_PTR;
+ return false;
+}
+
+
+// parse the HTTP/1.1 request line:
+// "method SP request-target SP HTTP-version CRLF"
+//
+static bool parse_request_line(http1_conn_t *conn, struct decoder_t *decoder, qd_iterator_pointer_t *line)
+{
+ qd_iterator_pointer_t method = {0};
+ qd_iterator_pointer_t target = {0};
+ qd_iterator_pointer_t version = {0};
+
+ if (!parse_token(line, &method) ||
+ !parse_field(line, &target) ||
+ !parse_field(line, &version)) {
+
+ decoder->error_msg = "Malformed request line";
+ decoder->error = HTTP1_STATUS_BAD_REQ;
+ return decoder->error;
+ }
+
+ // translate iterator pointers to C strings
+ ensure_scratch_size(&decoder->scratch, method.remaining + target.remaining + version.remaining + 3);
+ uint8_t *ptr = decoder->scratch.buf;
+ size_t avail = decoder->scratch.size;
+
+ uint8_t *method_str = ptr;
+ size_t offset = pointer_2_str(&method, method_str, avail);
+ ptr += offset;
+ avail -= offset;
+
+ uint8_t *target_str = ptr;
+ offset = pointer_2_str(&target, target_str, avail);
+ ptr += offset;
+ avail += offset;
+
+ uint8_t *version_str = ptr;
+ pointer_2_str(&version, version_str, avail);
+
+ uint32_t major = 0;
+ uint32_t minor = 0;
+ if (sscanf((char*)version_str, "HTTP/%"SCNu32".%"SCNu32, &major, &minor) != 2) {
+ decoder->error_msg = "Malformed version in request";
+ decoder->error = HTTP1_STATUS_BAD_REQ;
+ return decoder->error;
+ }
+
+ if (major != 1 || minor > 1) {
+ decoder->error_msg = "Unsupported HTTP version";
+ decoder->error = HTTP1_STATUS_BAD_VERSION;
+ return decoder->error;
+ }
+
+ http1_transfer_t *xfer = http1_transfer(conn);
+
+ // check for methods that do not support body content in the response:
+ if (strcmp((char*)method_str, "HEAD") == 0)
+ xfer->is_head_method = true;
+ else if (strcmp((char*)method_str, "CONNECT") == 0)
+ xfer->is_connect_method = true;
+
+ decoder->xfer = xfer;
+ decoder->is_request = true;
+ decoder->is_1_0 = (minor == 0);
+
+ decoder->error = conn->config.xfer_rx_request(xfer, (char*)method_str, (char*)target_str, (char*)version_str);
+ if (decoder->error)
+ decoder->error_msg = "xfer_rx_request callback error";
+ return decoder->error;
+}
+
+
+// parse the HTTP/1.1 response line
+// "HTTP-version SP status-code [SP reason-phrase] CRLF"
+//
+static int parse_response_line(http1_conn_t *conn, struct decoder_t *decoder, qd_iterator_pointer_t *line)
+{
+ qd_iterator_pointer_t version = {0};
+ qd_iterator_pointer_t status_code = {0};
+ qd_iterator_pointer_t reason = {0};
+
+ if (!parse_field(line, &version)
+ || !parse_field(line, &status_code)
+ || status_code.remaining != 3) {
+
+ decoder->error_msg = "Malformed response status line";
+ decoder->error = HTTP1_STATUS_SERVER_ERR;
+ return decoder->error;
+ }
+
+ // Responses arrive in the same order as requests are generated so this new
+ // response corresponds to head xfer
+ http1_transfer_t *xfer = DEQ_HEAD(conn->xfers);
+ if (!xfer) {
+ // receiving a response without a corresponding request
+ decoder->error_msg = "Spurious HTTP response received";
+ decoder->error = HTTP1_STATUS_SERVER_ERR;
+ return decoder->error;
+ }
+
+ assert(!decoder->xfer); // state machine violation
+ assert(xfer->response_code == 0);
+
+ decoder->xfer = xfer;
+
+ unsigned char code_str[4];
+ pointer_2_str(&status_code, code_str, 4);
+ xfer->response_code = atoi((char*) code_str);
+
+ // the reason phrase is optional, and may contain spaces
+
+ reason = *line;
+ if (reason.remaining >= 2) // expected for CRLF
+ reason.remaining -= 2;
+ trim_whitespace(&reason);
+
+ // convert to C strings
+ ensure_scratch_size(&decoder->scratch, version.remaining + reason.remaining + 2);
+ uint8_t *ptr = decoder->scratch.buf;
+ size_t avail = decoder->scratch.size;
+
+ uint8_t *version_str = ptr;
+ size_t offset = pointer_2_str(&version, version_str, avail);
+ ptr += offset;
+ avail -= offset;
+
+ uint8_t *reason_str = ptr;
+ offset = pointer_2_str(&reason, reason_str, avail);
+
+ uint32_t major = 0;
+ uint32_t minor = 0;
+ if (sscanf((char*)version_str, "HTTP/%"SCNu32".%"SCNu32, &major, &minor) != 2) {
+ decoder->error_msg = "Malformed version in response";
+ decoder->error = HTTP1_STATUS_SERVER_ERR;
+ return decoder->error;
+ }
+
+ if (major != 1 || minor > 1) {
+ decoder->error_msg = "Unsupported HTTP version";
+ decoder->error = HTTP1_STATUS_BAD_VERSION;
+ return decoder->error;
+ }
+
+ decoder->is_request = false;
+ decoder->is_1_0 = (minor == 0);
+
+ decoder->error = conn->config.xfer_rx_response(decoder->xfer, (char*)version_str,
+ xfer->response_code,
+ (offset) ? (char*)reason_str: 0);
+ if (decoder->error)
+ decoder->error_msg = "xfer_rx_response callback error";
+
+ return decoder->error;
+}
+
+
+// parse the first line of an incoming http message
+//
+static bool parse_start_line(http1_conn_t *conn, struct decoder_t *decoder)
+{
+ qd_iterator_pointer_t *rptr = &decoder->read_ptr;
+ qd_iterator_pointer_t line;
+
+ if (read_line(rptr, &line)) {
+ debug_print_iterator_pointer("start line:", &line);
+
+ if (!is_empty_line(&line)) { // RFC7230: ignore any preceding CRLF
+ if (conn->config.type == HTTP1_CONN_CLIENT) {
+ parse_request_line(conn, decoder, &line);
+ } else {
+ parse_response_line(conn, decoder, &line);
+ }
+ conn->decoder.state = HTTP1_MSG_STATE_HEADERS;
+ }
+ return !!rptr->remaining;
+ }
+
+ return false; // pend for more input
+}
+
+//
+// Header parsing
+//
+
+// Called after the last incoming header was decoded and passed to the
+// application
+//
+static bool process_headers_done(http1_conn_t *conn, struct decoder_t *decoder)
+{
+ // Flush all buffers processed so far - no longer needed
+
+ qd_buffer_t *head = DEQ_HEAD(decoder->incoming);
+ while (head && head != decoder->read_ptr.buffer) {
+ DEQ_REMOVE_HEAD(decoder->incoming);
+ qd_buffer_free(head);
+ head = DEQ_HEAD(decoder->incoming);
+ }
+
+ // perform any post-headers validation:
+
+ if (decoder->is_request) {
+ if (decoder->hdr_transfer_encoding && !decoder->is_chunked) {
+ // RFC7230 Message Body Length: If a Transfer-Encoding header field
+ // is present in a request and the chunked transfer coding is not
+ // the final encoding, the message body length cannot be determined
+ // reliably; the server MUST respond with the 400 (Bad Request)
+ // status code and then close the connection.
+ decoder->error_msg = "Non-chunked Tranfer-Encoding in request";
+ decoder->error = HTTP1_STATUS_BAD_REQ;
+ return false;
+ }
+ }
+
+ decoder->error = conn->config.xfer_rx_headers_done(decoder->xfer);
+ if (decoder->error) {
+ decoder->error_msg = "xfer_rx_headers_done callback error";
+ return false;
+ }
+
+ // determine if a body is present (ref RFC7230 sec 3.3.3 Message Body Length)
+ bool has_body;
+ if (decoder->is_request) {
+ // an http request will have a body ONLY if either chunked transfer or
+ // non-zero Content-Length header was given.
+ has_body = (decoder->is_chunked || decoder->content_length);
+
+ } else {
+ // An HTTP response has a body if request method is NOT HEAD or CONNECT AND
+ // the response code indicates a body. A body will either have a specific
+ // size via Content-Length or chunked encoder, OR its length is unspecified
+ // and the message body is terminated by closing the connection.
+ //
+ http1_transfer_t *xfer = decoder->xfer;
+ has_body = !(xfer->is_head_method ||
+ xfer->is_connect_method ||
+ xfer->response_code == 204 || // No Content
+ xfer->response_code == 205 || // Reset Content
+ xfer->response_code == 304 || // Not Modified
+ IS_INFO_RESPONSE(xfer->response_code));
+ if (has_body) {
+ // no body if explicit Content-Length of zero
+ if (decoder->hdr_content_length && decoder->content_length == 0) {
+ has_body = false;
+ }
+ }
+ }
+
+ if (has_body) {
+ // start tracking the body buffer chain
+ decoder->body_ptr = decoder->read_ptr;
+ decoder->body_ptr.remaining = 0;
+ decoder->state = HTTP1_MSG_STATE_BODY;
+ } else {
+ decoder->state = HTTP1_MSG_STATE_DONE;
+ }
+
+ return !!decoder->read_ptr.remaining;
+}
+
+
+// process a received header to determine message body length, etc.
+//
+static int process_header(http1_conn_t *conn, struct decoder_t *decoder, const uint8_t *key, const uint8_t *value)
+{
+ int parse_error = decoder->is_request ? HTTP1_STATUS_BAD_REQ : HTTP1_STATUS_SERVER_ERR;
+
+ if (strcasecmp("Content-Length", (char*) key) == 0) {
+ uint64_t old = decoder->content_length;
+ if (sscanf((char*)value, "%"PRIu64, &decoder->content_length) != 1) {
+ decoder->error_msg = "Malformed Content-Length header";
+ decoder->error = parse_error;
+ return decoder->error;
+ }
+ if (old && old != decoder->content_length) {
+ decoder->error_msg = "Invalid duplicate Content-Length header";
+ decoder->error = parse_error;
+ return decoder->error;
+ }
+ decoder->hdr_content_length = true;
+
+ } else if (strcasecmp("Transfer-Encoding", (char *)key) == 0) {
+ // check if "chunked" is present and it is the last item in the value
+ // string. And remember kids: coding type names are case insensitive!
+ // Also note "value" has already been trimmed of whitespace at both
+ // ends.
+ size_t len = strlen((char*)value);
+ if (len >= 7) { // 7 = strlen("chunked")
+ const char *ptr = ((char*) value) + len - 7;
+ decoder->is_chunked = strcasecmp("chunked", ptr) == 0;
+ }
+ decoder->hdr_transfer_encoding = true;
+ }
+
+ return 0;
+}
+
+
+// Parse out the header key and value
+//
+static bool parse_header(http1_conn_t *conn, struct decoder_t *decoder)
+{
+ qd_iterator_pointer_t *rptr = &decoder->read_ptr;
+ qd_iterator_pointer_t line;
+ http1_transfer_t *xfer = decoder->xfer;
+ assert(xfer); // else state machine busted
+
+ if (read_line(rptr, &line)) {
+ debug_print_iterator_pointer("header:", &line);
+
+ if (is_empty_line(&line)) {
+ // end of headers
+ return process_headers_done(conn, decoder);
+ }
+
+ qd_iterator_pointer_t key = {0};
+
+ if (!parse_token(&line, &key)) {
+ decoder->error_msg = "Malformed Header";
+ decoder->error = (decoder->is_request) ? HTTP1_STATUS_BAD_REQ
+ : HTTP1_STATUS_SERVER_ERR;
+ return false;
+ }
+
+ // advance line past the ':'
+ uint8_t octet;
+ while (get_octet(&line, &octet) && octet != ':')
+ ;
+
+ // line now contains the value. convert to C strings and post callback
+ ensure_scratch_size(&decoder->scratch, key.remaining + line.remaining + 2);
+ uint8_t *ptr = decoder->scratch.buf;
+ size_t avail = decoder->scratch.size;
+
+ uint8_t *key_str = ptr;
+ size_t offset = pointer_2_str(&key, key_str, avail);
+ ptr += offset;
+ avail -= offset;
+
+ uint8_t *value_str = ptr;
+ pointer_2_str(&line, value_str, avail);
+
+ // trim whitespace on both ends of value
+ while (isspace(*value_str))
+ ++value_str;
+ ptr = value_str + strlen((char*) value_str);
+ while (ptr-- > value_str) {
+ if (!isspace(*ptr))
+ break;
+ *ptr = 0;
+ }
+
+ process_header(conn, decoder, key_str, value_str);
+
+ if (!decoder->error) {
+ decoder->error = conn->config.xfer_rx_header(xfer, (char *)key_str, (char *)value_str);
+ if (decoder->error)
+ decoder->error_msg = "xfer_rx_header callback error";
+ }
+
+ return !!rptr->remaining;
+ }
+
+ return false; // pend for more data
+}
+
+//
+// Chunked body encoding parser
+//
+
+
+// Pass message body data up to the application.
+//
+static inline int consume_body_data(http1_conn_t *conn, bool flush)
+{
+ struct decoder_t *decoder = &conn->decoder;
+ qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
+ qd_iterator_pointer_t *rptr = &decoder->read_ptr;
+
+ // shortcut: if no more data to parse send the entire incoming chain
+ if (rptr->remaining == 0) {
+
+ decoder->error = conn->config.xfer_rx_body(decoder->xfer, &decoder->incoming,
+ body_ptr->cursor - qd_buffer_base(body_ptr->buffer),
+ body_ptr->remaining);
+ DEQ_INIT(decoder->incoming);
+ *body_ptr = NULL_I_PTR;
+ *rptr = NULL_I_PTR;
+ return decoder->error;
+ }
+
+ // The read pointer points to somewhere in the buffer chain that contains some
+ // unparsed data. Send any buffers preceding the current read pointer.
+ qd_buffer_list_t blist = DEQ_EMPTY;
+ size_t octets = 0;
+ const size_t body_offset = body_ptr->cursor - qd_buffer_base(body_ptr->buffer);
+
+ // invariant:
+ assert(DEQ_HEAD(decoder->incoming) == body_ptr->buffer);
+
+ while (body_ptr->buffer && body_ptr->buffer != rptr->buffer) {
+ DEQ_REMOVE_HEAD(decoder->incoming);
+ DEQ_INSERT_TAIL(blist, body_ptr->buffer);
+ octets += qd_buffer_cursor(body_ptr->buffer) - body_ptr->cursor;
+ body_ptr->buffer = DEQ_HEAD(decoder->incoming);
+ body_ptr->cursor = qd_buffer_base(body_ptr->buffer);
+ }
+
+ // invariant:
+ assert(octets <= body_ptr->remaining);
+ body_ptr->remaining -= octets;
+
+ if (flush && body_ptr->remaining) {
+ // need to copy out remaining body octets into new buffer
+ qd_buffer_t *tail = qd_buffer();
+
+ assert(body_ptr->remaining <= qd_buffer_capacity(tail));
+ memcpy(qd_buffer_cursor(tail), body_ptr->cursor, body_ptr->remaining);
+ qd_buffer_insert(tail, body_ptr->remaining);
+ DEQ_INSERT_TAIL(blist, tail);
+ octets += body_ptr->remaining;
+
+ *body_ptr = *rptr;
+ body_ptr->remaining = 0;
+ }
+
+ decoder->error = conn->config.xfer_rx_body(decoder->xfer, &blist, body_offset, octets);
+ return decoder->error;
+}
+
+
+
+// parsing the start of a chunked header:
+// <chunk size in hex>CRLF
+//
+static bool parse_body_chunked_header(http1_conn_t *conn, struct decoder_t *decoder)
+{
+ qd_iterator_pointer_t *rptr = &decoder->read_ptr;
+ qd_iterator_pointer_t line;
+
+ assert(decoder->chunk_state == HTTP1_CHUNK_HEADER);
+ assert(decoder->chunk_length == 0);
+
+ if (read_line(rptr, &line)) {
+ decoder->body_ptr.remaining += line.remaining;
+
+ ensure_scratch_size(&decoder->scratch, line.remaining + 1);
+ uint8_t *ptr = decoder->scratch.buf;
+ pointer_2_str(&line, (unsigned char*) ptr, line.remaining + 1);
+ int rc = sscanf((char*) ptr, "%"SCNx64, &decoder->chunk_length);
+ if (rc != 1) {
+ decoder->error_msg = "Invalid chunk header";
+ decoder->error = (decoder->is_request) ? HTTP1_STATUS_BAD_REQ
+ : HTTP1_STATUS_SERVER_ERR;
+ return false;
+ }
+
+ if (decoder->chunk_length == 0) {
+ // last chunk
+ decoder->chunk_state = HTTP1_CHUNK_TRAILERS;
+ } else {
+ decoder->chunk_state = HTTP1_CHUNK_DATA;
+
+ // chunk_length does not include the CRLF trailer:
+ decoder->chunk_length += 2;
+ }
+
+
+ return !!rptr->remaining;
+ }
+
+ return false; // pend for more input
+}
+
+
+// Parse the data section of a chunk
+//
+static bool parse_body_chunked_data(http1_conn_t *conn, struct decoder_t *decoder)
+{
+ qd_iterator_pointer_t *rptr = &decoder->read_ptr;
+ qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
+
+ assert(decoder->chunk_state == HTTP1_CHUNK_DATA);
+
+ size_t skipped = skip_octets(rptr, decoder->chunk_length);
+ decoder->chunk_length -= skipped;
+ body_ptr->remaining += skipped;
+
+ if (decoder->chunk_length == 0) {
+ // end of chunk
+ decoder->chunk_state = HTTP1_CHUNK_HEADER;
+ consume_body_data(conn, false);
+ }
+
+ return !!rptr->remaining;
+}
+
+
+// Keep reading chunk trailers until the terminating empty line is read
+//
+static bool parse_body_chunked_trailer(http1_conn_t *conn, struct decoder_t *decoder)
+{
+ qd_iterator_pointer_t *rptr = &decoder->read_ptr;
+ qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
+ qd_iterator_pointer_t line;
+
+ assert(decoder->chunk_state == HTTP1_CHUNK_TRAILERS);
+
+ if (read_line(rptr, &line)) {
+ body_ptr->remaining += line.remaining;
+ if (is_empty_line(&line)) {
+ // end of message
+ consume_body_data(conn, true);
+ decoder->state = HTTP1_MSG_STATE_DONE;
+ }
+
+ return !!rptr->remaining;
+ }
+
+ return false; // pend for full line
+}
+
+
+// parse an incoming message body which is chunk encoded
+// Return True if there is more data pending to parse
+static bool parse_body_chunked(http1_conn_t *conn, struct decoder_t *decoder)
+{
+ bool more;
+ switch (decoder->chunk_state) {
+
+ case HTTP1_CHUNK_HEADER:
+ more = parse_body_chunked_header(conn, decoder);
+ break;
+
+ case HTTP1_CHUNK_DATA:
+ more = parse_body_chunked_data(conn, decoder);
+ break;
+
+ case HTTP1_CHUNK_TRAILERS:
+ more = parse_body_chunked_trailer(conn, decoder);
+ break;
+ } // end switch
+
+ return more;
+}
+
+
+// parse an incoming message body which is Content-Length bytes long
+//
+static bool parse_body_content(http1_conn_t *conn, struct decoder_t *decoder)
+{
+ qd_iterator_pointer_t *rptr = &decoder->read_ptr;
+ qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
+
+ size_t skipped = skip_octets(rptr, decoder->content_length);
+ decoder->content_length -= skipped;
+ body_ptr->remaining += skipped;
+ bool eom = decoder->content_length == 0;
+
+ consume_body_data(conn, eom);
+ if (eom)
+ decoder->state = HTTP1_MSG_STATE_DONE;
+
+ return !!rptr->remaining;
+}
+
+
+static bool parse_body(http1_conn_t *conn, struct decoder_t *decoder)
+{
+ if (decoder->is_chunked)
+ return parse_body_chunked(conn, decoder);
+
+ if (decoder->content_length)
+ return parse_body_content(conn, decoder);
+
+ // otherwise no explict body size, so just keep passing the entire unparsed
+ // incoming chain along until the remote closes the connection
+ decoder->error = conn->config.xfer_rx_body(decoder->xfer,
+ &decoder->incoming,
+ decoder->read_ptr.cursor
+ - qd_buffer_base(decoder->read_ptr.buffer),
+ decoder->read_ptr.remaining);
+ if (decoder->error) {
+ decoder->error_msg = "xfer_rx_body callback error";
+ return false;
+ }
+
+ decoder->body_ptr = decoder->read_ptr = NULL_I_PTR;
+ DEQ_INIT(decoder->incoming);
+
+ return false;
+}
+
+
+// Called when incoming message is complete
+//
+static bool parse_done(http1_conn_t *conn, struct decoder_t *decoder)
+{
+ http1_transfer_t *xfer = decoder->xfer;
+ bool is_response = !decoder->is_request;
+
+ if (!decoder->error) {
+ // signal the message receive is complete
+ conn->config.xfer_rx_done(xfer);
+
+ if (is_response) { // request<->response transfer complete
+
+ // Informational 1xx response codes are NOT teriminal - further responses are allowed!
+ if (IS_INFO_RESPONSE(xfer->response_code)) {
+ xfer->response_code = 0;
+ } else {
+ // The message exchange is complete
+ conn->config.xfer_done(xfer);
+ decoder->xfer = 0;
+ http1_transfer_free(xfer);
+ }
+ }
+
+ decoder_reset(decoder);
+ return !!decoder->read_ptr.remaining;
+ }
+ return false;
+}
+
+
+// Main decode loop.
+// Process received data until it is exhausted
+//
+static int decode_incoming(http1_conn_t *conn)
+{
+ struct decoder_t *decoder = &conn->decoder;
+ bool more = true;
+ while (more && !decoder->error) {
+
+ if (decoder->state == HTTP1_MSG_STATE_START)
+ more = parse_start_line(conn, decoder);
+
+ else if (decoder->state == HTTP1_MSG_STATE_HEADERS)
+ more = parse_header(conn, decoder);
+
+ else if (decoder->state == HTTP1_MSG_STATE_BODY)
+ more = parse_body(conn, decoder);
+
+ // Can reach DONE from any call above.
+ if (decoder->state == HTTP1_MSG_STATE_DONE)
+ more = parse_done(conn, decoder);
+ }
+
+ return decoder->error;
+}
+
+
+void *http1_connection_get_context(http1_conn_t *conn)
+{
+ return conn->context;
+}
+
+// Push inbound network data into the http1 protocol engine.
+//
+// All of the xfer_rx callback will occur in the context of this call. This
+// returns zero on success otherwise an error code. Any error occuring during
+// a callback will be reflected in the return value of this function. It is
+// expected that the caller will call http1_connection_close on a non-zero
+// return value.
+//
+int http1_connection_rx_data(http1_conn_t *conn, qd_buffer_list_t *data, size_t len)
+{
+ struct decoder_t *decoder = &conn->decoder;
+ bool init_ptrs = DEQ_IS_EMPTY(decoder->incoming);
+
+ DEQ_APPEND(decoder->incoming, *data);
+
+ if (init_ptrs) {
+ decoder->read_ptr.buffer = DEQ_HEAD(decoder->incoming);
+ decoder->read_ptr.cursor = qd_buffer_base(decoder->read_ptr.buffer);
+ decoder->read_ptr.remaining = len;
+
+ if (decoder->state == HTTP1_MSG_STATE_BODY) {
+ decoder->body_ptr = decoder->read_ptr;
+ decoder->body_ptr.remaining = 0;
+ }
+ } else {
+ decoder->read_ptr.remaining += len;
+ }
+
+ return decode_incoming(conn);
+}
+
+void http1_transfer_set_context(http1_transfer_t *xfer, void *context)
+{
+ xfer->context = context;
+}
+
+void *http1_transfer_get_context(const http1_transfer_t *xfer)
+{
+ return xfer->context;
+}
+
+http1_conn_t *http1_transfer_get_connection(const http1_transfer_t *xfer)
+{
+ return xfer->conn;
+}
+
+
+// initiate a new HTTP request. This creates a new transfer.
+// request = <method>SP<target>SP<version>CRLF
+// Expects version to be in the format "HTTP/X.Y"
+//
+http1_transfer_t *http1_tx_request(http1_conn_t *conn, const char *method, const char *target, const char *version)
+{
+ struct encoder_t *encoder = &conn->encoder;
+ assert(!encoder->xfer); // error: transfer already in progress
+ assert(conn->config.type == HTTP1_CONN_SERVER);
+
+ http1_transfer_t *xfer = encoder->xfer = http1_transfer(conn);
+ encoder->is_request = true;
+ encoder->crlf_sent = false;
+
+ if (strcmp((char*)method, "HEAD") == 0)
+ xfer->is_head_method = true;
+ else if (strcmp((char*)method, "CONNECT") == 0)
+ xfer->is_connect_method = true;
+
+ write_string(encoder, method);
+ write_string(encoder, " ");
+ write_string(encoder, target);
+ write_string(encoder, " ");
+ write_string(encoder, version);
+ write_string(encoder, CRLF);
+
+ return xfer;
+}
+
+
+// Send an HTTP response msg. xfer must correspond to the "oldest" outstanding
+// request that arrived via the xfer_rx_request callback for this connection.
+// version is expected to be in the form "HTTP/x.y"
+// status_code is expected to be 100 <= status_code <= 999
+// status-line = HTTP-version SP status-code SP reason-phrase CRLF
+//
+int http1_tx_response(http1_transfer_t *xfer, const char *version, int status_code, const char *reason_phrase)
+{
+ http1_conn_t *conn = http1_transfer_get_connection(xfer);
+ struct encoder_t *encoder = &conn->encoder;
+
+ assert(conn->config.type == HTTP1_CONN_CLIENT);
+ assert(!encoder->xfer); // error: transfer already in progress
+ assert(DEQ_HEAD(conn->xfers) == xfer); // error: response not in order!
+ assert(xfer->response_code == 0);
+
+ encoder->xfer = xfer;
+ encoder->is_request = false;
+ encoder->crlf_sent = false;
+ xfer->response_code = status_code;
+
+ char code_str[32];
+ snprintf(code_str, 32, "%d", status_code);
+
+ write_string(encoder, version);
+ write_string(encoder, " ");
+ write_string(encoder, code_str);
+ if (reason_phrase) {
+ write_string(encoder, " ");
+ write_string(encoder, reason_phrase);
+ }
+ write_string(encoder, CRLF);
+
+ return 0;
+}
+
+
+// Add a header field to an outgoing message
+// header-field = field-name ":" OWS field-value OWS
+int http1_tx_add_header(http1_transfer_t *xfer, const char *key, const char *value)
+{
+ http1_conn_t *conn = http1_transfer_get_connection(xfer);
+ struct encoder_t *encoder = &conn->encoder;
+ assert(encoder->xfer == xfer); // xfer not current transfer
+
+ write_string(encoder, key);
+ write_string(encoder, ": ");
+ write_string(encoder, value);
+ write_string(encoder, CRLF);
+
+ // check to see if there are any full buffers that can be sent.
+
+ qd_buffer_list_t blist = DEQ_EMPTY;
+ qd_buffer_t *buf = DEQ_HEAD(encoder->outgoing);
+ size_t octets = 0;
+ while (buf && buf != encoder->write_ptr.buffer) {
+ DEQ_REMOVE_HEAD(encoder->outgoing);
+ DEQ_INSERT_TAIL(blist, buf);
+ octets += qd_buffer_size(buf);
+ }
+ if (!DEQ_IS_EMPTY(blist))
+ conn->config.conn_tx_data(conn, &blist, 0, octets);
+
+ return 0;
+}
+
+
+// just forward the body chain along
+int http1_tx_body(http1_transfer_t *xfer, qd_buffer_list_t *data, size_t offset, size_t len)
+{
+ http1_conn_t *conn = http1_transfer_get_connection(xfer);
+ struct encoder_t *encoder = &conn->encoder;
+
+ fprintf(stderr, "http1_tx_body(offset=%zu size=%zu)\n", offset, len);
+
+ if (!encoder->crlf_sent) {
+ // need to terminate any headers by sending the plain CRLF that follows
+ // the headers
+ write_string(encoder, CRLF);
+
+ // flush all pending output. From this point out the outgoing queue is
+ // no longer used for this message
+ fprintf(stderr, "Flushing before body: %u bytes\n", qd_buffer_list_length(&encoder->outgoing));
+ conn->config.conn_tx_data(conn, &encoder->outgoing, 0, qd_buffer_list_length(&encoder->outgoing));
+ DEQ_INIT(encoder->outgoing);
+ encoder->write_ptr = NULL_I_PTR;
+ encoder->crlf_sent = true;
+ }
+
+ // skip the outgoing queue and send directly
+ fprintf(stderr, "Sending body data %zu bytes\n", len);
+ conn->config.conn_tx_data(conn, data, offset, len);
+
+ return 0;
+}
+
+
+int http1_tx_done(http1_transfer_t *xfer)
+{
+ http1_conn_t *conn = http1_transfer_get_connection(xfer);
+ struct encoder_t *encoder = &conn->encoder;
+
+ if (!encoder->crlf_sent) {
+ // need to send the plain CRLF that follows the headers
+ write_string(encoder, CRLF);
+
+ // flush all pending output.
+
+ fprintf(stderr, "Flushing at tx_done: %u bytes\n", qd_buffer_list_length(&encoder->outgoing));
+ conn->config.conn_tx_data(conn, &encoder->outgoing, 0, qd_buffer_list_length(&encoder->outgoing));
+ DEQ_INIT(encoder->outgoing);
+ encoder->write_ptr = NULL_I_PTR;
+ encoder->crlf_sent = true;
+ }
+
+ bool is_response = !encoder->is_request;
+ encoder_reset(encoder);
+
+ if (is_response) {
+ if (IS_INFO_RESPONSE(xfer->response_code)) {
+ // this is a non-terminal response. Another response is expected
+ // for this request so just reset the transfer state
+ xfer->response_code = 0;
+ } else {
+ // The message exchange is complete
+ conn->config.xfer_done(xfer);
+ http1_transfer_free(xfer);
+ }
+ }
+
+ return 0;
+}
+
+
diff --git a/src/adaptors/http_adaptor.c b/src/adaptors/http_adaptor.c
index 86708c8..0b70459 100644
--- a/src/adaptors/http_adaptor.c
+++ b/src/adaptors/http_adaptor.c
@@ -30,6 +30,7 @@
#include "qpid/dispatch/protocol_adaptor.h"
#include "delivery.h"
+#include "http_common.h"
#include "http_adaptor.h"
const char *PATH = ":path";
@@ -49,8 +50,6 @@ const char *CONTENT_ENCODING = "content-encoding";
NGHTTP2_NV_FLAG_NONE \
}
-ALLOC_DEFINE(qd_http_lsnr_t);
-ALLOC_DEFINE(qd_http_connector_t);
ALLOC_DEFINE(qdr_http2_session_data_t);
ALLOC_DEFINE(qdr_http2_stream_data_t);
@@ -554,26 +553,6 @@ static void grant_read_buffers(qdr_http_connection_t *conn)
}
-static void free_bridge_config(qd_bridge_config_t *config)
-{
- if (!config) {
- return;
- }
- free(config->host);
- free(config->port);
- free(config->name);
- free(config->address);
- free(config->host_port);
-}
-
-void qd_http_listener_decref(qd_http_lsnr_t* li)
-{
- if (li && sys_atomic_dec(&li->ref_count) == 1) {
- free_bridge_config(&li->config);
- free_qd_http_lsnr_t(li);
- }
-}
-
static void qdr_http_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
{
}
@@ -892,18 +871,8 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
return 0;
}
-void qd_http_connector_decref(qd_http_connector_t* c)
+void qd_http2_delete_connector(qd_dispatch_t *qd, qd_http_connector_t *connector)
{
- if (c && sys_atomic_dec(&c->ref_count) == 1) {
- free_bridge_config(&c->config);
- free_qd_http_connector_t(c);
- }
-}
-
-
-void qd_dispatch_delete_http_connector(qd_dispatch_t *qd, void *impl)
-{
- qd_http_connector_t *connector = (qd_http_connector_t*) impl;
if (connector) {
//TODO: cleanup and close any associated active connections
DEQ_REMOVE(http_adaptor->connectors, connector);
@@ -1102,69 +1071,25 @@ static void handle_listener_event(pn_event_t *e, qd_server_t *qd_server, void *c
}
-static qd_http_lsnr_t *qd_http_lsnr(qd_server_t *server)
-{
- qd_http_lsnr_t *li = new_qd_http_lsnr_t();
- if (!li)
- return 0;
- ZERO(li);
- sys_atomic_init(&li->ref_count, 1);
- li->server = server;
- li->context.context = li;
- li->context.handler = &handle_listener_event;
- return li;
-}
-
-
-#define CHECK() if (qd_error_code()) goto error
-
-
static const int BACKLOG = 50; /* Listening backlog */
static bool http_listener_listen(qd_http_lsnr_t *li) {
- li->pn_listener = pn_listener();
- if (li->pn_listener) {
- pn_listener_set_context(li->pn_listener, &li->context);
- pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, li->config.host_port, BACKLOG);
- sys_atomic_inc(&li->ref_count); /* In use by proactor, PN_LISTENER_CLOSE will dec */
- /* Listen is asynchronous, log "listening" message on PN_LISTENER_OPEN event */
- } else {
- qd_log(http_adaptor->log_source, QD_LOG_CRITICAL, "Failed to create listener for %s",
- li->config.host_port);
- }
+ pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, li->config.host_port, BACKLOG);
+ sys_atomic_inc(&li->ref_count); /* In use by proactor, PN_LISTENER_CLOSE will dec */
+ /* Listen is asynchronous, log "listening" message on PN_LISTENER_OPEN event */
return li->pn_listener;
}
-static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_bridge_config_t *config, qd_entity_t* entity)
-{
- qd_error_clear();
- ZERO(config);
-
- config->name = qd_entity_get_string(entity, "name"); CHECK();
- config->host = qd_entity_get_string(entity, "host"); CHECK();
- config->port = qd_entity_get_string(entity, "port"); CHECK();
- config->address = qd_entity_get_string(entity, "address"); CHECK();
-
- int hplen = strlen(config->host) + strlen(config->port) + 2;
- config->host_port = malloc(hplen);
- snprintf(config->host_port, hplen, "%s:%s", config->host, config->port);
-
- return QD_ERROR_NONE;
-
- error:
- free_bridge_config(config);
- return qd_error_code();
-}
-
-qd_http_lsnr_t *qd_dispatch_configure_http_lsnr(qd_dispatch_t *qd, qd_entity_t *entity)
+qd_http_lsnr_t *qd_http2_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
{
- qd_http_lsnr_t *li = qd_http_lsnr(qd->server);
- if (!li || load_bridge_config(qd, &li->config, entity) != QD_ERROR_NONE) {
- qd_log(http_adaptor->log_source, QD_LOG_ERROR, "Unable to create http listener: %s", qd_error_message());
- qd_http_listener_decref(li);
+ qd_http_lsnr_t *li = qd_http_lsnr(qd->server, &handle_listener_event);
+ if (!li) {
+ qd_log(http_adaptor->log_source, QD_LOG_ERROR, "Unable to create http listener: no memory");
return 0;
}
+
+ li->config = *config;
//DEQ_ITEM_INIT(li);
DEQ_INSERT_TAIL(http_adaptor->listeners, li);
qd_log(http_adaptor->log_source, QD_LOG_INFO, "Configured HTTP_ADAPTOR listener on %s", (&li->config)->host_port);
@@ -1173,16 +1098,12 @@ qd_http_lsnr_t *qd_dispatch_configure_http_lsnr(qd_dispatch_t *qd, qd_entity_t *
}
-static qd_http_connector_t *qd_http_connector(qd_server_t *server)
+void qd_http2_delete_listener(qd_dispatch_t *qd, qd_http_lsnr_t *listener)
{
- qd_http_connector_t *c = new_qd_http_connector_t();
- if (!c) return 0;
- ZERO(c);
- sys_atomic_init(&c->ref_count, 1);
- c->server = server;
- return c;
+ // TBD?
}
+
static void on_activate(void *context)
{
qdr_http_connection_t* conn = (qdr_http_connection_t*) context;
@@ -1278,20 +1199,18 @@ qdr_http_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector
}
-
-qd_http_connector_t *qd_dispatch_configure_http_connector(qd_dispatch_t *qd, qd_entity_t *entity)
+qd_http_connector_t *qd_http2_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
{
qd_http_connector_t *c = qd_http_connector(qd->server);
- if (!c || load_bridge_config(qd, &c->config, entity) != QD_ERROR_NONE) {
- qd_log(http_adaptor->log_source, QD_LOG_ERROR, "Unable to create tcp connector: %s", qd_error_message());
- qd_http_connector_decref(c);
+ if (!c) {
+ qd_log(http_adaptor->log_source, QD_LOG_ERROR, "Unable to create http connector: no memory");
return 0;
}
+ c->config = *config;
DEQ_ITEM_INIT(c);
DEQ_INSERT_TAIL(http_adaptor->connectors, c);
qdr_http_connection_egress(c);
return c;
-
}
static void qdr_http_adaptor_final(void *adaptor_context)
@@ -1302,17 +1221,6 @@ static void qdr_http_adaptor_final(void *adaptor_context)
http_adaptor = NULL;
}
-qd_error_t qd_entity_refresh_httpListener(qd_entity_t* entity, void *impl)
-{
- return QD_ERROR_NONE;
-}
-
-
-qd_error_t qd_entity_refresh_httpConnector(qd_entity_t* entity, void *impl)
-{
- return QD_ERROR_NONE;
-}
-
/**
* This initialization function will be invoked when the router core is ready for the protocol
* adaptor to be created. This function must:
@@ -1341,7 +1249,7 @@ static void qdr_http_adaptor_init(qdr_core_t *core, void **adaptor_context)
qdr_http_delivery_update,
qdr_http_conn_close,
qdr_http_conn_trace);
- adaptor->log_source = qd_log_source("HTTP_ADAPTOR");
+ adaptor->log_source = qd_log_source(QD_HTTP_LOG_SOURCE);
adaptor->protocol_log_source = qd_log_source("PROTOCOL");
*adaptor_context = adaptor;
DEQ_INIT(adaptor->listeners);
diff --git a/src/adaptors/http_adaptor.h b/src/adaptors/http_adaptor.h
index 5982622..2f0134a 100644
--- a/src/adaptors/http_adaptor.h
+++ b/src/adaptors/http_adaptor.h
@@ -25,44 +25,16 @@
#include <qpid/dispatch/log.h>
#include <nghttp2/nghttp2.h>
+
// We already have a qd_http_listener_t defined in http-libwebsockets.c
// We will call this as qd_http_lsnr_t in order to avoid a clash.
// At a later point in time, we will handle websocket here as well
// and get rid of http-libwebsockets.c and rename this as qd_http_listener_t
-typedef struct qd_http_lsnr_t qd_http_lsnr_t;
-typedef struct qd_http_connector_t qd_http_connector_t;
typedef struct qdr_http2_session_data_t qdr_http2_session_data_t;
-typedef struct qd_bridge_config_t qd_bridge_config_t;
typedef struct qdr_http2_stream_data_t qdr_http2_stream_data_t;
typedef struct qdr_http_connection_t qdr_http_connection_t;
DEQ_DECLARE(qdr_http2_stream_data_t, qd_http2_stream_data_list_t);
-struct qd_bridge_config_t {
- char *name;
- char *host;
- char *port;
- char *address;
- char *host_port;
-};
-
-struct qd_http_lsnr_t {
- qd_handler_context_t context;
- sys_atomic_t ref_count;
- qd_server_t *server;
- qd_bridge_config_t config;
- pn_listener_t *pn_listener;
- DEQ_LINKS(qd_http_lsnr_t);
-};
-
-struct qd_http_connector_t {
- sys_atomic_t ref_count;
- qd_server_t *server;
- qd_bridge_config_t config;
- qd_timer_t *timer;
- long delay;
- DEQ_LINKS(qd_http_connector_t);
-};
-
struct qdr_http2_session_data_t {
qd_http2_stream_data_list_t streams; // A session can have many streams.
nghttp2_session *session; // A pointer to the nghttp2s' session object
@@ -110,7 +82,7 @@ struct qdr_http_connection_t {
pn_raw_buffer_t read_buffers[4];
bool ingress;
qd_timer_t *activate_timer;
- qd_bridge_config_t *config;
+ qd_http_bridge_config_t *config;
qd_server_t *server;
uint64_t conn_id;
@@ -126,11 +98,7 @@ struct qdr_http_connection_t {
nghttp2_data_provider data_prd;
};
-DEQ_DECLARE(qd_http_lsnr_t, qd_http_lsnr_list_t);
-ALLOC_DECLARE(qd_http_lsnr_t);
ALLOC_DECLARE(qdr_http2_session_data_t);
-ALLOC_DECLARE(qd_http_connector_t);
ALLOC_DECLARE(qdr_http2_stream_data_t);
-DEQ_DECLARE(qd_http_connector_t, qd_http_connector_list_t);
diff --git a/src/adaptors/http_common.c b/src/adaptors/http_common.c
new file mode 100644
index 0000000..b3a6fc9
--- /dev/null
+++ b/src/adaptors/http_common.c
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "http_common.h"
+
+#include <proton/listener.h>
+
+#include <stdio.h>
+
+ALLOC_DECLARE(qd_http_lsnr_t);
+ALLOC_DEFINE(qd_http_lsnr_t);
+ALLOC_DECLARE(qd_http_connector_t);
+ALLOC_DEFINE(qd_http_connector_t);
+
+
+static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_http_bridge_config_t *config, qd_entity_t* entity)
+{
+ char *version_str = 0;
+
+ qd_error_clear();
+ ZERO(config);
+
+#define CHECK() if (qd_error_code()) goto error
+ config->name = qd_entity_get_string(entity, "name"); CHECK();
+ config->host = qd_entity_get_string(entity, "host"); CHECK();
+ config->port = qd_entity_get_string(entity, "port"); CHECK();
+ config->address = qd_entity_get_string(entity, "address"); CHECK();
+ version_str = qd_entity_get_string(entity, "protcolVersion"); CHECK();
+
+ if (strncmp(version_str, "HTTP/1", 6) == 0) {
+ config->version = VERSION_HTTP1;
+ } else {
+ config->version = VERSION_HTTP2;
+ }
+ free(version_str);
+ version_str = 0;
+
+ int hplen = strlen(config->host) + strlen(config->port) + 2;
+ config->host_port = malloc(hplen);
+ snprintf(config->host_port, hplen, "%s:%s", config->host, config->port);
+
+ return QD_ERROR_NONE;
+
+error:
+ qd_http_free_bridge_config(config);
+ free(version_str);
+ return qd_error_code();
+}
+
+
+void qd_http_free_bridge_config(qd_http_bridge_config_t *config)
+{
+ if (!config) {
+ return;
+ }
+ free(config->host);
+ free(config->port);
+ free(config->name);
+ free(config->address);
+ free(config->host_port);
+}
+
+
+//
+// HTTP Listener Management (HttpListenerEntity)
+//
+
+
+qd_http_lsnr_t *qd_dispatch_configure_http_lsnr(qd_dispatch_t *qd, qd_entity_t *entity)
+{
+ qd_http_lsnr_t *listener = 0;
+ qd_http_bridge_config_t config;
+
+ if (load_bridge_config(qd, &config, entity) != QD_ERROR_NONE) {
+ qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_ERROR,
+ "Unable to create http listener: %s", qd_error_message());
+ return 0;
+ }
+
+ switch (config.version) {
+ case VERSION_HTTP1:
+ listener = qd_http1_configure_listener(qd, &config, entity);
+ break;
+ case VERSION_HTTP2:
+ listener = qd_http2_configure_listener(qd, &config, entity);
+ break;
+ }
+
+ if (!listener)
+ qd_http_free_bridge_config(&config);
+
+ return listener;
+}
+
+
+void qd_dispatch_delete_http_listener(qd_dispatch_t *qd, void *impl)
+{
+ qd_http_lsnr_t *listener = (qd_http_lsnr_t*) impl;
+ if (listener) {
+ switch (listener->config.version) {
+ case VERSION_HTTP1:
+ qd_http1_delete_listener(qd, listener);
+ break;
+ case VERSION_HTTP2:
+ qd_http2_delete_listener(qd, listener);
+ break;
+ }
+ }
+}
+
+
+qd_error_t qd_entity_refresh_httpListener(qd_entity_t* entity, void *impl)
+{
+ return QD_ERROR_NONE;
+}
+
+
+//
+// HTTP Connector Management (HttpConnectorEntity)
+//
+
+
+qd_http_connector_t *qd_dispatch_configure_http_connector(qd_dispatch_t *qd, qd_entity_t *entity)
+{
+ qd_http_connector_t *conn = 0;
+ qd_http_bridge_config_t config;
+
+ if (load_bridge_config(qd, &config, entity) != QD_ERROR_NONE) {
+ qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_ERROR,
+ "Unable to create http connector: %s", qd_error_message());
+ return 0;
+ }
+
+ switch (config.version) {
+ case VERSION_HTTP1:
+ conn = qd_http1_configure_connector(qd, &config, entity);
+ break;
+ case VERSION_HTTP2:
+ conn = qd_http2_configure_connector(qd, &config, entity);
+ break;
+ }
+
+ if (!conn)
+ qd_http_free_bridge_config(&config);
+
+ return conn;
+}
+
+
+void qd_dispatch_delete_http_connector(qd_dispatch_t *qd, void *impl)
+{
+ qd_http_connector_t *conn = (qd_http_connector_t*) impl;
+
+ if (conn) {
+ switch (conn->config.version) {
+ case VERSION_HTTP1:
+ qd_http1_delete_connector(qd, conn);
+ break;
+ case VERSION_HTTP2:
+ qd_http2_delete_connector(qd, conn);
+ break;
+ }
+ }
+}
+
+qd_error_t qd_entity_refresh_httpConnector(qd_entity_t* entity, void *impl)
+{
+ return QD_ERROR_NONE;
+}
+
+//
+// qd_http_lsnr_t constructor
+//
+
+qd_http_lsnr_t *qd_http_lsnr(qd_server_t *server, qd_server_event_handler_t handler)
+{
+ qd_http_lsnr_t *li = new_qd_http_lsnr_t();
+ if (!li)
+ return 0;
+ ZERO(li);
+
+ li->pn_listener = pn_listener();
+ if (!li->pn_listener) {
+ free_qd_http_lsnr_t(li);
+ return 0;
+ }
+
+ sys_atomic_init(&li->ref_count, 1);
+ li->server = server;
+ li->context.context = li;
+ li->context.handler = handler;
+ pn_listener_set_context(li->pn_listener, &li->context);
+
+ return li;
+}
+
+void qd_http_listener_decref(qd_http_lsnr_t* li)
+{
+ if (li && sys_atomic_dec(&li->ref_count) == 1) {
+ qd_http_free_bridge_config(&li->config);
+ free_qd_http_lsnr_t(li);
+ }
+}
+
+//
+// qd_http_connector_t constructor
+//
+
+qd_http_connector_t *qd_http_connector(qd_server_t *server)
+{
+ qd_http_connector_t *c = new_qd_http_connector_t();
+ if (!c) return 0;
+ ZERO(c);
+ sys_atomic_init(&c->ref_count, 1);
+ c->server = server;
+ return c;
+}
+
+void qd_http_connector_decref(qd_http_connector_t* c)
+{
+ if (c && sys_atomic_dec(&c->ref_count) == 1) {
+ qd_http_free_bridge_config(&c->config);
+ free_qd_http_connector_t(c);
+ }
+}
+
+
diff --git a/src/adaptors/http_common.h b/src/adaptors/http_common.h
new file mode 100644
index 0000000..e8a7ff9
--- /dev/null
+++ b/src/adaptors/http_common.h
@@ -0,0 +1,108 @@
+#ifndef __http_common_h__
+#define __http_common_h__
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/atomic.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/timer.h>
+
+#include "delivery.h"
+#include "entity.h"
+
+#define QD_HTTP_LOG_SOURCE "HTTP_ADAPTOR"
+
+typedef enum {
+ VERSION_HTTP1,
+ VERSION_HTTP2,
+} qd_http_version_t;
+
+typedef struct qd_http_bridge_config_t {
+ char *name;
+ char *host;
+ char *port;
+ char *address;
+ char *host_port;
+ qd_http_version_t version;
+} qd_http_bridge_config_t;
+
+void qd_http_free_bridge_config(qd_http_bridge_config_t *config);
+
+typedef struct qd_http_lsnr_t qd_http_lsnr_t;
+struct qd_http_lsnr_t {
+ qd_http_bridge_config_t config;
+ qd_handler_context_t context;
+ sys_atomic_t ref_count;
+ qd_server_t *server;
+ pn_listener_t *pn_listener;
+ DEQ_LINKS(qd_http_lsnr_t);
+};
+DEQ_DECLARE(qd_http_lsnr_t, qd_http_lsnr_list_t);
+
+qd_http_lsnr_t *qd_http_lsnr(qd_server_t *server, qd_server_event_handler_t handler);
+void qd_http_listener_decref(qd_http_lsnr_t* li);
+
+typedef struct qd_http_connector_t qd_http_connector_t;
+struct qd_http_connector_t {
+ qd_http_bridge_config_t config;
+ sys_atomic_t ref_count;
+ qd_server_t *server;
+ qd_timer_t *timer;
+ long delay;
+ struct qdr_http_connection_t *dispatcher; // pseudo egress connection
+ DEQ_LINKS(qd_http_connector_t);
+};
+DEQ_DECLARE(qd_http_connector_t, qd_http_connector_list_t);
+
+qd_http_connector_t *qd_http_connector(qd_server_t *server);
+void qd_http_connector_decref(qd_http_connector_t* c);
+
+
+
+//
+// Management Entity Interfaces (see HttpListenerEntity and HttpConnectorEntity in agent.py)
+//
+
+qd_http_lsnr_t *qd_dispatch_configure_http_lsnr(qd_dispatch_t *qd, qd_entity_t *entity);
+void qd_dispatch_delete_http_listener(qd_dispatch_t *qd, void *impl);
+qd_error_t qd_entity_refresh_httpListener(qd_entity_t* entity, void *impl);
+
+qd_http_connector_t *qd_dispatch_configure_http_connector(qd_dispatch_t *qd, qd_entity_t *entity);
+void qd_dispatch_delete_http_connector(qd_dispatch_t *qd, void *impl);
+qd_error_t qd_entity_refresh_httpConnector(qd_entity_t* entity, void *impl);
+
+//
+// These functions are defined in their respective HTTP adaptors:
+//
+
+qd_http_lsnr_t *qd_http1_configure_listener(qd_dispatch_t *, const qd_http_bridge_config_t *, qd_entity_t *);
+qd_http_lsnr_t *qd_http2_configure_listener(qd_dispatch_t *, const qd_http_bridge_config_t *, qd_entity_t *);
+
+void qd_http1_delete_listener(qd_dispatch_t *, qd_http_lsnr_t *);
+void qd_http2_delete_listener(qd_dispatch_t *, qd_http_lsnr_t *);
+
+qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *, const qd_http_bridge_config_t *, qd_entity_t *);
+qd_http_connector_t *qd_http2_configure_connector(qd_dispatch_t *, const qd_http_bridge_config_t *, qd_entity_t *);
+
+void qd_http1_delete_connector(qd_dispatch_t *, qd_http_connector_t *);
+void qd_http2_delete_connector(qd_dispatch_t *, qd_http_connector_t *);
+
+
+#endif // __http_common_h__
diff --git a/tests/system_tests_qdmanage.py b/tests/system_tests_qdmanage.py
index efd102f..8553e4f 100644
--- a/tests/system_tests_qdmanage.py
+++ b/tests/system_tests_qdmanage.py
@@ -39,7 +39,7 @@ DUMMY = "org.apache.qpid.dispatch.dummy"
CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', u'int_property': 6451}
-TOTAL_ENTITIES=29 # for tests that check the total # of entities
+TOTAL_ENTITIES=31 # for tests that check the total # of entities
class QdmanageTest(TestCase):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org