You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/09/30 19:24:03 UTC
[qpid-dispatch] branch dev-protocol-adaptors updated:
DISPATCH-1744: HTTP1 protocol adaptor (WIP)
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
new 1c054ad DISPATCH-1744: HTTP1 protocol adaptor (WIP)
1c054ad is described below
commit 1c054ad1196ccecd6574ad1413a535814d513402
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Thu Aug 13 10:15:07 2020 -0400
DISPATCH-1744: HTTP1 protocol adaptor (WIP)
This closes #855
---
include/qpid/dispatch/http1_codec.h | 229 ++++
include/qpid/dispatch/http1_lib.h | 118 --
python/qpid_dispatch/management/qdrouter.json | 1 +
src/CMakeLists.txt | 5 +-
src/adaptors/adaptor_utils.c | 147 +++
src/adaptors/adaptor_utils.h | 54 +
src/adaptors/http1/http1_adaptor.c | 707 +++++++++-
src/adaptors/http1/http1_client.c | 1306 +++++++++++++++++++
src/adaptors/http1/{http1_lib.c => http1_codec.c} | 634 +++++----
src/adaptors/http1/http1_private.h | 272 ++++
src/adaptors/http1/http1_server.c | 1430 +++++++++++++++++++++
src/adaptors/http_common.c | 6 +-
src/adaptors/http_common.h | 3 +-
src/message.c | 2 +-
tests/CMakeLists.txt | 1 +
tests/system_tests_http1_adaptor.py | 722 +++++++++++
16 files changed, 5240 insertions(+), 397 deletions(-)
diff --git a/include/qpid/dispatch/http1_codec.h b/include/qpid/dispatch/http1_codec.h
new file mode 100644
index 0000000..3682dd1
--- /dev/null
+++ b/include/qpid/dispatch/http1_codec.h
@@ -0,0 +1,229 @@
+#ifndef http1_codec_H
+#define http1_codec_H 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/dispatch/buffer.h>
+#include <qpid/dispatch/message.h>
+
+#include <inttypes.h>
+#include <stdbool.h>
+
+
+// HTTP/1.x Encoder/Decoder Library
+//
+// This library provides an API for encoding and decoding HTTP/1.x messages.
+//
+// The decoder takes qd_buffer_t chains containing HTTP/1.x data read from the
+// TCP connection and issues callbacks as various parts (headers, body, status)
+// of the HTTP/1.x message are parsed.
+//
+// The encoder allows the application to construct an HTTP/1.x message. An API
+// is provided for building the message and callbacks are invoked when the
+// encoder has full qd_buffer_t or body_data to send out the TCP connection.
+//
+// This library provides two classes:
+//
+// * h1_codec_connection_t - a context for a single TCP connection over which
+// HTTP/1.x messages are exchanged.
+//
+// * h1_codec_request_state_t - a context which tracks the state of a single
+// HTTP/1.x Request <-> Response message exchange. Multiple
+// h1_codec_request_state_t can be associated with an h1_codec_connection_t due to
+// request pipelining.
+//
+
+
+#define HTTP1_VERSION_1_1 "HTTP/1.1"
+#define HTTP1_VERSION_1_0 "HTTP/1.0"
+
+typedef struct h1_codec_connection_t h1_codec_connection_t;
+typedef struct h1_codec_request_state_t h1_codec_request_state_t;
+
+
+typedef enum {
+ HTTP1_CONN_CLIENT, // connection initiated by client
+ HTTP1_CONN_SERVER, // connection to server
+} h1_codec_connection_type_t;
+
+
+typedef enum {
+ HTTP1_STATUS_BAD_REQ = 400,
+ HTTP1_STATUS_SERVER_ERR = 500,
+ HTTP1_STATUS_BAD_VERSION = 505,
+ HTTP1_STATUS_SERVICE_UNAVAILABLE = 503,
+} h1_codec_status_code_t;
+
+
+typedef struct h1_codec_config_t {
+
+ h1_codec_connection_type_t type;
+
+ // Callbacks to send data out the raw connection. These callbacks are
+ // triggered by the message creation API (h1_codec_tx_*) Note well: these
+ // callbacks are called in the order in which the data must be written out
+ // the raw connection!
+
+ // tx_buffers()
+ // Send a list of buffers containing encoded HTTP message data. The caller
+ // assumes ownership of the buffer list and must release the buffers when
+ // done. len is set to the total octets of data in the list.
+ //
+ void (*tx_buffers)(h1_codec_request_state_t *hrs, qd_buffer_list_t *data, unsigned int len);
+
+ // tx_body_data()
+ // Called with body_data containing encoded HTTP message data. Only
+ // called if the outgoing HTTP message has a body. The caller assumes
+ // ownership of the body_data and must release it when done.
+ //
+ void (*tx_body_data)(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data);
+
+ //
+ // RX message callbacks
+ //
+ // These callbacks should return 0 on success or non-zero on error. A
+ // non-zero return code is used as the return code from
+ // h1_codec_connection_rx_data()
+ //
+
+ // HTTP request received - new h1_codec_request_state_t created (hrs). This
+ // hrs must be supplied in the h1_codec_tx_response() method when sending the
+ // response.
+ int (*rx_request)(h1_codec_request_state_t *hrs,
+ const char *method,
+ const char *target,
+ uint32_t version_major,
+ uint32_t version_minor);
+
+ // HTTP response received - the h1_codec_request_state_t comes from the return
+ // value of the h1_codec_tx_request() method used to create the corresponding
+ // request. Note well that if status_code is Informational (1xx) then this
+ // response is NOT the last response for the current request (See RFC7231,
+ // 6.2 Informational 1xx). The request_done callback will be called after
+ // the LAST response has been received.
+ //
+ int (*rx_response)(h1_codec_request_state_t *hrs,
+ int status_code,
+ const char *reason_phrase,
+ uint32_t version_major,
+ uint32_t version_minor);
+
+ int (*rx_header)(h1_codec_request_state_t *hrs, const char *key, const char *value);
+ int (*rx_headers_done)(h1_codec_request_state_t *hrs, bool has_body);
+
+ int (*rx_body)(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t offset, uintmax_t len, bool more);
+
+ // Invoked after a received HTTP message has been completely parsed.
+ //
+ void (*rx_done)(h1_codec_request_state_t *hrs);
+
+ // Invoked when the final response message has been decoded (server
+ // connection) or encoded (client connection), or the request has been cancelled.
+ // hrs is freed on return from this callback and must not be referenced further.
+ void (*request_complete)(h1_codec_request_state_t *hrs,
+ bool cancelled);
+
+} h1_codec_config_t;
+
+
+// create a new connection and assign it a context
+//
+h1_codec_connection_t *h1_codec_connection(h1_codec_config_t *config, void *context);
+void *h1_codec_connection_get_context(h1_codec_connection_t *conn);
+
+// Notify the codec that the endpoint closed the connection. This should be
+// called for server connections only. Once the server has reconnected it is
+// safe to resume calling h1_codec_connection_rx_data(). This method is a
+// no-op for client connections. When a client connection closes the
+// application must cancel all outstanding requests and then call
+// h1_codec_connection_free() instead.
+//
+void h1_codec_connection_closed(h1_codec_connection_t *conn);
+
+// Release the codec. This can only be done after all outstanding requests
+// have been completed or cancelled.
+//
+void h1_codec_connection_free(h1_codec_connection_t *conn);
+
+// Push inbound network data into the http1 library. All rx_*() callbacks occur
+// during this call. The return value is zero on success. If a non-zero value
+// is returned the codec state is unknown - the application must cancel all
+// outstanding requests and destroy the conn by calling
+// h1_codec_connection_free().
+//
+int h1_codec_connection_rx_data(h1_codec_connection_t *conn, qd_buffer_list_t *data, uintmax_t len);
+
+void h1_codec_request_state_set_context(h1_codec_request_state_t *hrs, void *context);
+void *h1_codec_request_state_get_context(const h1_codec_request_state_t *hrs);
+h1_codec_connection_t *h1_codec_request_state_get_connection(const h1_codec_request_state_t *hrs);
+
+// Cancel the request. The h1_codec_request_state_t is freed during this call.
+// The request_complete callback will be invoked during this call with
+// cancelled=True.
+//
+void h1_codec_request_state_cancel(h1_codec_request_state_t *hrs);
+
+const char *h1_codec_request_state_method(const h1_codec_request_state_t *hrs);
+
+// true when codec has encoded/decoded a complete request message
+bool h1_codec_request_complete(const h1_codec_request_state_t *hrs);
+
+// true when codec has encoded/decoded a complete response message
+bool h1_codec_response_complete(const h1_codec_request_state_t *hrs);
+
+//
+// API for sending HTTP/1.x messages
+//
+// The tx_msg_headers and tx_msg_body callbacks can occur during any of these
+// calls.
+//
+
+
+// initiate a request - this creates a new request state context
+//
+h1_codec_request_state_t *h1_codec_tx_request(h1_codec_connection_t *conn, const char *method, const char *target,
+ uint32_t version_major, uint32_t version_minor);
+
+// Respond to a received request - the request state context should be the one
+// supplied during the corresponding rx_request callback. It is required that
+// the caller issues responses in the same order as requests arrive.
+//
+int h1_codec_tx_response(h1_codec_request_state_t *hrs, int status_code, const char *reason_phrase,
+ uint32_t version_major, uint32_t version_minor);
+
+// add header to outgoing message
+//
+int h1_codec_tx_add_header(h1_codec_request_state_t *hrs, const char *key, const char *value);
+
+// Stream outgoing body data. Ownership of body_data is passed to the caller.
+//
+int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data);
+
+// outgoing message construction complete. The request_complete() callback MAY
+// occur during this call.
+//
+// need_close: set to true if the message is a response that does not provide
+// an explict body length. If true it is up to the caller to close the
+// underlying socket connection after all outgoing data for this request has
+// been sent.
+//
+int h1_codec_tx_done(h1_codec_request_state_t *hrs, bool *need_close);
+
+
+#endif // http1_codec_H
diff --git a/include/qpid/dispatch/http1_lib.h b/include/qpid/dispatch/http1_lib.h
deleted file mode 100644
index 58385e0..0000000
--- a/include/qpid/dispatch/http1_lib.h
+++ /dev/null
@@ -1,118 +0,0 @@
-#ifndef http1_lib_H
-#define http1_lib_H 1
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <qpid/dispatch/buffer.h>
-
-#include <inttypes.h>
-
-#define HTTP1_VERSION_1_1 "HTTP/1.1"
-#define HTTP1_VERSION_1_0 "HTTP/1.0"
-
-typedef struct http1_conn_t http1_conn_t;
-typedef struct http1_transfer_t http1_transfer_t;
-
-
-typedef enum {
- HTTP1_CONN_CLIENT, // connection initiated by client
- HTTP1_CONN_SERVER, // connection to server
-} http1_conn_type_t;
-
-typedef enum {
- HTTP1_STATUS_BAD_REQ = 400,
- HTTP1_STATUS_SERVER_ERR = 500,
- HTTP1_STATUS_BAD_VERSION = 505,
-} http1_status_code_t;
-
-typedef struct http1_conn_config_t {
-
- http1_conn_type_t type;
-
- // called with output data to write to the network
- void (*conn_tx_data)(http1_conn_t *conn, qd_buffer_list_t *data, size_t offset, unsigned int len);
-
- // @TODO(kgiusti) - remove?
- //void (*conn_error)(http1_conn_t *conn, int code, const char *reason);
-
- //
- // RX message callbacks
- //
-
- // HTTP request received - new transfer created (xfer). This xfer must be
- // supplied in the http1_response() method
- int (*xfer_rx_request)(http1_transfer_t *xfer,
- const char *method,
- const char *target,
- const char *version);
-
- // HTTP response received - the transfer comes from the return value of the
- // corresponding http1_request method. Note well that if status_code is
- // Informational (1xx) then this response is NOT the last response for the
- // current request (See RFC7231, 6.2 Informational 1xx). The xfer_done
- // callback will be called after the LAST response has been received.
- //
- int (*xfer_rx_response)(http1_transfer_t *xfer,
- const char *version,
- int status_code,
- const char *reason_phrase);
-
- int (*xfer_rx_header)(http1_transfer_t *xfer, const char *key, const char *value);
- int (*xfer_rx_headers_done)(http1_transfer_t *xfer);
-
- int (*xfer_rx_body)(http1_transfer_t *xfer, qd_buffer_list_t *body, size_t offset, size_t len);
-
- void (*xfer_rx_done)(http1_transfer_t *xfer);
-
- // Invoked when the request/response(s) exchange has completed
- //
- void (*xfer_done)(http1_transfer_t *xfer);
-} http1_conn_config_t;
-
-
-http1_conn_t *http1_connection(http1_conn_config_t *config, void *context);
-void http1_connection_close(http1_conn_t *conn);
-void *http1_connection_get_context(http1_conn_t *conn);
-
-// push inbound network data into the http1 library
-int http1_connection_rx_data(http1_conn_t *conn, qd_buffer_list_t *data, size_t len);
-
-
-//
-// API for sending HTTP/1.1 messages
-//
-void http1_transfer_set_context(http1_transfer_t *xfer, void *context);
-void *http1_transfer_get_context(const http1_transfer_t *xfer);
-http1_conn_t *http1_transfer_get_connection(const http1_transfer_t *xfer);
-
-// initiate a request - this creates a new message transfer context
-http1_transfer_t *http1_tx_request(http1_conn_t *conn, const char *method, const char *target, const char *version);
-
-// respond to a received request - the transfer context should be from the corresponding xfer_rx_request callback
-int http1_tx_response(http1_transfer_t *xfer, const char *version, int status_code, const char *reason_phrase);
-
-int http1_tx_add_header(http1_transfer_t *xfer, const char *key, const char *value);
-int http1_tx_body(http1_transfer_t *xfer, qd_buffer_list_t *data, size_t offset, size_t len);
-int http1_tx_done(http1_transfer_t *xfer);
-
-
-
-
-
-#endif // http1_lib_H
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index c9deff4..2cf54a4 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1135,6 +1135,7 @@
"host": {
"description":"IP address: ipv4 or ipv6 literal or a host name",
"type": "string",
+ "default": "127.0.0.1",
"create": true
},
"port": {
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 92b0c5f..103cbaa 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -39,10 +39,13 @@ add_custom_command (
# Build the qpid-dispatch library.
set(qpid_dispatch_SOURCES
adaptors/reference_adaptor.c
+ adaptors/adaptor_utils.c
adaptors/http_common.c
adaptors/http2/http2_adaptor.c
- adaptors/http1/http1_lib.c
+ adaptors/http1/http1_codec.c
adaptors/http1/http1_adaptor.c
+ adaptors/http1/http1_client.c
+ adaptors/http1/http1_server.c
adaptors/tcp_adaptor.c
alloc_pool.c
amqp.c
diff --git a/src/adaptors/adaptor_utils.c b/src/adaptors/adaptor_utils.c
new file mode 100644
index 0000000..562f488
--- /dev/null
+++ b/src/adaptors/adaptor_utils.c
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "adaptor_utils.h"
+
+#include <proton/netaddr.h>
+#include <string.h>
+
+
+#define RAW_BUFFER_BATCH 16
+
+
+char *qda_raw_conn_get_address(pn_raw_connection_t *socket)
+{
+ const pn_netaddr_t *netaddr = pn_raw_connection_remote_addr(socket);
+ char buffer[1024];
+ int len = pn_netaddr_str(netaddr, buffer, 1024);
+ if (len <= 1024) {
+ return strdup(buffer);
+ } else {
+ return strndup(buffer, 1024);
+ }
+}
+
+
+void qda_raw_conn_get_read_buffers(pn_raw_connection_t *conn, qd_buffer_list_t *blist, uintmax_t *length)
+{
+ pn_raw_buffer_t buffs[RAW_BUFFER_BATCH];
+
+ DEQ_INIT(*blist);
+ uintmax_t len = 0;
+ size_t n;
+ while ((n = pn_raw_connection_take_read_buffers(conn, buffs, RAW_BUFFER_BATCH)) != 0) {
+ for (size_t i = 0; i < n; ++i) {
+ qd_buffer_t *qd_buf = (qd_buffer_t*)buffs[i].context;
+ assert(qd_buf);
+ if (buffs[i].size) {
+ // set content length:
+ qd_buffer_insert(qd_buf, buffs[i].size);
+ len += buffs[i].size;
+ DEQ_INSERT_TAIL(*blist, qd_buf);
+ } else { // ignore empty buffers
+ qd_buffer_free(qd_buf);
+ }
+ }
+ }
+
+ *length = len;
+}
+
+
+int qda_raw_conn_grant_read_buffers(pn_raw_connection_t *conn)
+{
+ pn_raw_buffer_t buffs[RAW_BUFFER_BATCH];
+ if (pn_raw_connection_is_read_closed(conn))
+ return 0;
+
+ int granted = 0;
+ size_t count = pn_raw_connection_read_buffers_capacity(conn);
+ while (count) {
+ size_t batch_ct = 0;
+ for (int i = 0; i < RAW_BUFFER_BATCH; ++i) {
+ qd_buffer_t *buf = qd_buffer();
+ buffs[i].context = (intptr_t)buf;
+ buffs[i].bytes = (char*) qd_buffer_base(buf);
+ buffs[i].capacity = qd_buffer_capacity(buf);
+ buffs[i].size = 0;
+ buffs[i].offset = 0;
+ batch_ct += 1;
+ count -= 1;
+ if (count == 0)
+ break;
+ }
+ pn_raw_connection_give_read_buffers(conn, buffs, batch_ct);
+ granted += batch_ct;
+ }
+
+ return granted;
+}
+
+
+int qda_raw_conn_write_buffers(pn_raw_connection_t *conn, qd_buffer_list_t *blist, size_t offset)
+{
+ pn_raw_buffer_t buffs[RAW_BUFFER_BATCH];
+ size_t count = pn_raw_connection_write_buffers_capacity(conn);
+ count = MIN(count, DEQ_SIZE(*blist));
+ int sent = 0;
+
+ // Since count is set to ensure that we never run out of capacity or
+ // buffers to send we can avoid checking that on every loop
+
+ while (count) {
+ size_t batch_ct = 0;
+
+ for (int i = 0; i < RAW_BUFFER_BATCH; ++i) {
+ qd_buffer_t *buf = DEQ_HEAD(*blist);
+ DEQ_REMOVE_HEAD(*blist);
+
+ buffs[i].context = (intptr_t)buf;
+ buffs[i].bytes = (char*)qd_buffer_base(buf);
+ buffs[i].capacity = 0;
+ buffs[i].size = qd_buffer_size(buf) - offset;
+ buffs[i].offset = offset;
+ offset = 0; // all succeeding bufs start at base
+
+ batch_ct += 1;
+ count -= 1;
+ if (count == 0)
+ break;
+ }
+ pn_raw_connection_write_buffers(conn, buffs, batch_ct);
+ sent += batch_ct;
+ }
+
+ return sent;
+}
+
+
+void qda_raw_conn_free_write_buffers(pn_raw_connection_t *conn)
+{
+ pn_raw_buffer_t buffs[RAW_BUFFER_BATCH];
+ size_t n;
+ while ((n = pn_raw_connection_take_written_buffers(conn, buffs, RAW_BUFFER_BATCH)) != 0) {
+ for (size_t i = 0; i < n; ++i) {
+ qd_buffer_t *qd_buf = (qd_buffer_t*)buffs[i].context;
+ assert(qd_buf);
+ qd_buffer_free(qd_buf);
+ }
+ }
+}
+
diff --git a/src/adaptors/adaptor_utils.h b/src/adaptors/adaptor_utils.h
new file mode 100644
index 0000000..eb1bed0
--- /dev/null
+++ b/src/adaptors/adaptor_utils.h
@@ -0,0 +1,54 @@
+#ifndef __adaptor_utils_h__
+#define __adaptor_utils_h__
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/buffer.h>
+#include <proton/raw_connection.h>
+
+// Get the raw connections remote address
+// Caller must free() the result when done.
+//
+char *qda_raw_conn_get_address(pn_raw_connection_t *socket);
+
+// Retrieve all available incoming data buffers from the raw connection.
+// Return the result in blist, with the total number of read octets in *length
+// Note: only those buffers containing data (size != 0) are returned.
+//
+void qda_raw_conn_get_read_buffers(pn_raw_connection_t *conn, qd_buffer_list_t *blist, uintmax_t *length);
+
+// allocate empty read buffers to the raw connection. This will provide enough buffers
+// to fill the connections capacity. Returns the number of buffers granted.
+//
+int qda_raw_conn_grant_read_buffers(pn_raw_connection_t *conn);
+
+
+// Write blist buffers to the connection. Buffers are removed from the HEAD of
+// blist. Returns the actual number of buffers taken. offset is an optional
+// offset to the first outgoing octet in the HEAD buffer.
+//
+int qda_raw_conn_write_buffers(pn_raw_connection_t *conn, qd_buffer_list_t *blist, size_t offset);
+
+
+// release all sent buffers held by the connection
+//
+void qda_raw_conn_free_write_buffers(pn_raw_connection_t *conn);
+
+
+#endif // __adaptor_utils_h__
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index beb9eb4..d9d4e4c 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -17,40 +17,709 @@
* under the License.
*/
-#include <qpid/dispatch/http1_lib.h>
-#include <qpid/dispatch/protocol_adaptor.h>
-#include <qpid/dispatch/message.h>
-#include "adaptors/http_common.h"
+#include "http1_private.h"
#include <stdio.h>
#include <inttypes.h>
-typedef struct qd_http1_adaptor_t {
- qdr_core_t *core;
- qdr_protocol_adaptor_t *adaptor;
- qd_http_lsnr_list_t listeners;
- qd_http_connector_list_t connectors;
- qd_log_source_t *log;
-} qd_http1_adaptor_t;
+//
+// This file contains code for the HTTP/1.x protocol adaptor. This file
+// includes code that is common to both ends of the protocol (e.g. client and
+// server processing). See http1_client.c and http1_server.c for code specific
+// to HTTP endpoint processing.
+//
-//static qd_http1_adaptor_t *http1_adaptor;
+// for debug: will dump raw buffer content to stdout if true
+#define HTTP1_DUMP_BUFFERS false
-#define BUFF_BATCH 16
+#define RAW_BUFFER_BATCH 16
-// dummy for now:
-qd_http_lsnr_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
+/*
+ HTTP/1.x <--> AMQP message mapping
+
+ Message Properties Section:
+
+ HTTP Message AMQP Message Properties
+ ------------ -----------------------
+ Request Method subject field
+
+ Application Properties Section:
+
+ HTTP Message AMQP Message App Properties Map
+ ------------ -------------------------------
+ Request Version "http:request": "<version|1.1 default>"
+ Response Version "http:response": "<version|1.1 default>"
+ Response Status Code "http:status": <int32>
+ Response Reason "http:reason": <string>
+ Request Target "http:target": <string>
+ * "<lowercase(key)>" <string>
+
+ Notes:
+ - Message App Properties Keys that start with "http:" are reserved by the
+ adaptor for meta-data.
+ */
+
+// @TODO(kgiusti): rx complete + abort ingress deliveries when endpoint dies while msg in flight
+
+
+ALLOC_DEFINE(qdr_http1_out_data_t);
+ALLOC_DEFINE(qdr_http1_connection_t);
+
+
+qdr_http1_adaptor_t *qdr_http1_adaptor;
+
+
+void qdr_http1_request_base_cleanup(qdr_http1_request_base_t *hreq)
{
- return 0;
+ if (hreq) {
+ DEQ_REMOVE(hreq->hconn->requests, hreq);
+ h1_codec_request_state_cancel(hreq->lib_rs);
+ free(hreq->response_addr);
+ }
}
-void qd_http1_delete_listener(qd_dispatch_t *qd, qd_http_lsnr_t *listener) {}
-qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
+void qdr_http1_connection_free(qdr_http1_connection_t *hconn)
{
+ if (hconn) {
+
+ sys_mutex_lock(qdr_http1_adaptor->lock);
+ DEQ_REMOVE(qdr_http1_adaptor->connections, hconn);
+ sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+ // request expected to be clean up by caller
+#if 0 // JIRA ME!
+ assert(DEQ_IS_EMPTY(hconn->requests));
+#endif
+
+ h1_codec_connection_free(hconn->http_conn);
+ if (hconn->raw_conn) {
+ pn_raw_connection_set_context(hconn->raw_conn, 0);
+ pn_raw_connection_close(hconn->raw_conn);
+ }
+#if 0
+ if (hconn->out_link) {
+ qdr_link_set_context(hconn->out_link, 0);
+ qdr_link_detach(hconn->out_link, QD_CLOSED, 0);
+ }
+ if (hconn->in_link) {
+ qdr_link_set_context(hconn->in_link, 0);
+ qdr_link_detach(hconn->in_link, QD_CLOSED, 0);
+ }
+ if (hconn->qdr_conn) {
+ qdr_connection_set_context(hconn->qdr_conn, 0);
+ qdr_connection_closed(hconn->qdr_conn);
+ }
+#endif
+
+ free(hconn->cfg.host);
+ free(hconn->cfg.port);
+ free(hconn->cfg.address);
+ free(hconn->cfg.host_port);
+
+ free(hconn->client.client_ip_addr);
+ free(hconn->client.reply_to_addr);
+
+ qd_timer_free(hconn->server.reconnect_timer);
+
+ free_qdr_http1_connection_t(hconn);
+ }
+}
+
+
+void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data)
+{
+ if (out_data) {
+ // expect: all buffers returned from proton!
+ assert(qdr_http1_out_data_buffers_outstanding(out_data) == 0);
+ qdr_http1_out_data_t *od = DEQ_HEAD(out_data->fifo);
+ while (od) {
+ DEQ_REMOVE_HEAD(out_data->fifo);
+ if (od->body_data)
+ qd_message_body_data_release(od->body_data);
+ else
+ qd_buffer_list_free_buffers(&od->raw_buffers);
+ free_qdr_http1_out_data_t(od);
+ od = DEQ_HEAD(out_data->fifo);
+ }
+ }
+}
+
+
+// Return the number of buffers in the process of being written out by the proactor.
+// These buffers are "owned" by proton - they must not be freed until proton has
+// released them.
+//
+int qdr_http1_out_data_buffers_outstanding(const qdr_http1_out_data_fifo_t *out_data)
+{
+ int count = 0;
+ if (out_data) {
+ qdr_http1_out_data_t *od = DEQ_HEAD(out_data->fifo);
+ while (od) {
+ count += od->next_buffer - od->free_count;
+ if (od == out_data->write_ptr)
+ break;
+
+ od = DEQ_NEXT(od);
+ }
+ }
+ return count;
+}
+
+
+// Initiate close of the raw connection.
+//
+void qdr_http1_close_connection(qdr_http1_connection_t *hconn, const char *error)
+{
+ if (error) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR,
+ "[C%"PRIu64"] Connection closing: %s", hconn->conn_id, error);
+ }
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"] Initiating close of connection", hconn->conn_id);
+
+ if (hconn->raw_conn) {
+ hconn->close_connection = true;
+ pn_raw_connection_close(hconn->raw_conn);
+ }
+
+ // clean up all connection related stuff on PN_RAW_CONNECTION_DISCONNECTED
+ // event
+}
+
+
+void qdr_http1_rejected_response(qdr_http1_request_base_t *hreq,
+ const qdr_error_t *error)
+{
+ char *reason = 0;
+ if (error) {
+ size_t len = 0;
+ char *ename = qdr_error_name(error);
+ char *edesc = qdr_error_description(error);
+ if (ename) len += strlen(ename);
+ if (edesc) len += strlen(edesc);
+ if (len) {
+ reason = qd_malloc(len + 2);
+ reason[0] = 0;
+ if (ename) {
+ strcat(reason, ename);
+ strcat(reason, " ");
+ }
+ if (edesc)
+ strcat(reason, edesc);
+ }
+ free(ename);
+ free(edesc);
+ }
+
+ qdr_http1_error_response(hreq, HTTP1_STATUS_BAD_REQ,
+ reason ? reason : "Invalid Request");
+ free(reason);
+}
+
+
+// send a server error response
+//
+void qdr_http1_error_response(qdr_http1_request_base_t *hreq,
+ int error_code,
+ const char *reason)
+{
+ if (hreq->lib_rs) {
+ bool ignored;
+ h1_codec_tx_response(hreq->lib_rs, error_code, reason, 1, 1);
+ h1_codec_tx_done(hreq->lib_rs, &ignored);
+ }
+}
+
+
+const char *qdr_http1_token_list_next(const char *start, size_t *len, const char **next)
+{
+ static const char *SKIPME = ", \t";
+
+ *len = 0;
+ *next = 0;
+
+ if (!start) return 0;
+
+ while (*start && strchr(SKIPME, *start))
+ ++start;
+
+ if (!*start) return 0;
+
+ const char *end = start;
+ while (*end && !strchr(SKIPME, *end))
+ ++end;
+
+ *len = end - start;
+ *next = end;
+
+ while (**next && strchr(SKIPME, **next))
+ ++(*next);
+
+ return start;
+}
+
+
+//
+// Raw Connection Write Buffer Management
+//
+
+
+// Write pending data out the raw connection. Preserve order by only writing
+// the head request data.
+//
+uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_fifo_t *fifo)
+{
+ pn_raw_buffer_t buffers[RAW_BUFFER_BATCH];
+ size_t count = !hconn->raw_conn || pn_raw_connection_is_write_closed(hconn->raw_conn)
+ ? 0
+ : pn_raw_connection_write_buffers_capacity(hconn->raw_conn);
+
+ uint64_t total_octets = 0;
+ qdr_http1_out_data_t *od = fifo->write_ptr;
+ while (count > 0 && od) {
+ qd_buffer_t *wbuf = 0;
+ int od_len = MIN(count,
+ (od->buffer_count - od->next_buffer));
+ assert(od_len); // error: no data @ head?
+
+ // send the out_data as a series of writes to proactor
+
+ while (od_len) {
+ size_t limit = MIN(RAW_BUFFER_BATCH, od_len);
+ int written = 0;
+
+ if (od->body_data) { // buffers stored in qd_message_t
+
+ written = qd_message_body_data_buffers(od->body_data, buffers, od->next_buffer, limit);
+ for (int i = 0; i < written; ++i) {
+ // enforce this: we expect the context can be used by the adaptor!
+ assert(buffers[i].context == 0);
+ buffers[i].context = (uintptr_t)od;
+ total_octets += buffers[i].size;
+ }
+
+ } else { // list of buffers in od->raw_buffers
+ // advance to next buffer to send in od
+ if (!wbuf) {
+ wbuf = DEQ_HEAD(od->raw_buffers);
+ for (int i = 0; i < od->next_buffer; ++i)
+ wbuf = DEQ_NEXT(wbuf);
+ }
+
+ pn_raw_buffer_t *rdisc = &buffers[0];
+ while (limit--) {
+ rdisc->context = (uintptr_t)od;
+ rdisc->bytes = (char*) qd_buffer_base(wbuf);
+ rdisc->capacity = 0;
+ rdisc->size = qd_buffer_size(wbuf);
+ rdisc->offset = 0;
+
+ total_octets += rdisc->size;
+ ++rdisc;
+ wbuf = DEQ_NEXT(wbuf);
+ written += 1;
+ }
+ }
+
+ // keep me, you'll need it
+ if (HTTP1_DUMP_BUFFERS) {
+ for (size_t j = 0; j < written; ++j) {
+ char *ptr = (char*) buffers[j].bytes;
+ int len = (int) buffers[j].size;
+ fprintf(stdout, "\n[C%"PRIu64"] Raw Write: Ptr=%p len=%d\n value='%.*s'\n",
+ hconn->conn_id, (void*)ptr, len, len, ptr);
+ fflush(stdout);
+ }
+ }
+
+ written = pn_raw_connection_write_buffers(hconn->raw_conn, buffers, written);
+ count -= written;
+ od_len -= written;
+ od->next_buffer += written;
+ }
+
+ if (od->next_buffer == od->buffer_count) {
+ // all buffers in od have been passed to proton.
+ od = DEQ_NEXT(od);
+ fifo->write_ptr = od;
+ wbuf = 0;
+ }
+ }
+
+ hconn->out_http1_octets += total_octets;
+ return total_octets;
+}
+
+
+// The HTTP encoder has a list of buffers to be written to the raw connection.
+// Queue it to the outgoing data fifo.
+//
+void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_list_t *blist)
+{
+ int count = (int) DEQ_SIZE(*blist);
+ if (count) {
+ qdr_http1_out_data_t *od = new_qdr_http1_out_data_t();
+ ZERO(od);
+ od->owning_fifo = fifo;
+ od->buffer_count = (int) DEQ_SIZE(*blist);
+ od->raw_buffers = *blist;
+ DEQ_INIT(*blist);
+
+ DEQ_INSERT_TAIL(fifo->fifo, od);
+ if (!fifo->write_ptr)
+ fifo->write_ptr = od;
+ }
+}
+
+
+// The HTTP encoder has a message body data to be written to the raw connection.
+// Queue it to the outgoing data fifo.
+//
+void qdr_http1_enqueue_body_data(qdr_http1_out_data_fifo_t *fifo, qd_message_body_data_t *body_data)
+{
+ int count = qd_message_body_data_buffer_count(body_data);
+ if (count) {
+ qdr_http1_out_data_t *od = new_qdr_http1_out_data_t();
+ ZERO(od);
+ od->owning_fifo = fifo;
+ od->body_data = body_data;
+ od->buffer_count = count;
+
+ DEQ_INSERT_TAIL(fifo->fifo, od);
+ if (!fifo->write_ptr)
+ fifo->write_ptr = od;
+ } else {
+ // empty body-data
+ qd_message_body_data_release(body_data);
+ }
+}
+
+
+// Called during proactor event PN_RAW_CONNECTION_WRITTEN
+//
+void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn)
+{
+ pn_raw_buffer_t buffers[RAW_BUFFER_BATCH];
+ size_t count;
+ while ((count = pn_raw_connection_take_written_buffers(hconn->raw_conn, buffers, RAW_BUFFER_BATCH)) != 0) {
+ for (size_t i = 0; i < count; ++i) {
+
+ // keep me, you'll need it
+ if (HTTP1_DUMP_BUFFERS) {
+ char *ptr = (char*) buffers[i].bytes;
+ int len = (int) buffers[i].size;
+ fprintf(stdout, "\n[C%"PRIu64"] Raw Written: Ptr=%p len=%d\n value='%.*s'\n",
+ hconn->conn_id, (void*)ptr, len, len, ptr);
+ fflush(stdout);
+ }
+
+ qdr_http1_out_data_t *od = (qdr_http1_out_data_t*) buffers[i].context;
+ assert(od);
+ // Note: according to proton devs the order in which write buffers
+ // are released are NOT guaranteed to be in the same order in which
+ // they were written!
+
+ od->free_count += 1;
+ if (od->free_count == od->buffer_count) {
+ // all buffers returned
+ qdr_http1_out_data_fifo_t *fifo = od->owning_fifo;
+ DEQ_REMOVE(fifo->fifo, od);
+ if (od->body_data)
+ qd_message_body_data_release(od->body_data);
+ else
+ qd_buffer_list_free_buffers(&od->raw_buffers);
+ free_qdr_http1_out_data_t(od);
+ }
+ }
+ }
+}
+
+
+//
+// Protocol Adaptor Callbacks
+//
+
+
+// Invoked by the core thread to wake an I/O thread for the connection
+//
+static void _core_connection_activate_CT(void *context, qdr_connection_t *conn)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_connection_get_context(conn);
+ if (!hconn) return;
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] Connection activate", hconn->conn_id);
+
+ if (hconn->raw_conn)
+ pn_raw_connection_wake(hconn->raw_conn);
+ else
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] missing raw connection!", hconn->conn_id);
+}
+
+
+static void _core_link_first_attach(void *context,
+ qdr_connection_t *conn,
+ qdr_link_t *link,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target,
+ qd_session_class_t ssn_class)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_connection_get_context(conn);
+ if (hconn)
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] Link first attach", hconn->conn_id);
+}
+
+
+static void _core_link_second_attach(void *context,
+ qdr_link_t *link,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+ if (!hconn) return;
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] Link second attach", hconn->conn_id, link->identity);
+
+ if (hconn->type == HTTP1_CONN_CLIENT) {
+ qdr_http1_client_core_second_attach((qdr_http1_adaptor_t*) context,
+ hconn, link, source, target);
+ }
+}
+
+
+static void _core_link_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+ if (hconn) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] Link detach", hconn->conn_id, link->identity);
+
+ qdr_link_set_context(link, 0);
+ if (link == hconn->out_link)
+ hconn->out_link = 0;
+ else
+ hconn->in_link = 0;
+ }
+}
+
+
+static void _core_link_flow(void *context, qdr_link_t *link, int credit)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+ if (hconn) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] Link flow (%d)",
+ hconn->conn_id, link->identity, credit);
+ if (hconn->type == HTTP1_CONN_SERVER)
+ qdr_http1_server_core_link_flow((qdr_http1_adaptor_t*) context, hconn, link, credit);
+ else
+ qdr_http1_client_core_link_flow((qdr_http1_adaptor_t*) context, hconn, link, credit);
+ }
+}
+
+
+static void _core_link_offer(void *context, qdr_link_t *link, int delivery_count)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+ if (hconn) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] Link offer (%d)",
+ hconn->conn_id, link->identity, delivery_count);
+ }
+}
+
+
+static void _core_link_drained(void *context, qdr_link_t *link)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+ if (hconn) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] Link drained",
+ hconn->conn_id, link->identity);
+ }
+}
+
+
+static void _core_link_drain(void *context, qdr_link_t *link, bool mode)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+ if (hconn) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] Link drain %s",
+ hconn->conn_id, link->identity,
+ mode ? "ON" : "OFF");
+ }
+}
+
+
+static int _core_link_push(void *context, qdr_link_t *link, int limit)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+ if (hconn) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] Link push %d", hconn->conn_id, link->identity, limit);
+ return qdr_link_process_deliveries(qdr_http1_adaptor->core, link, limit);
+ }
return 0;
}
-void qd_http1_delete_connector(qd_dispatch_t *qd, qd_http_connector_t *conn) {}
+
+// The I/O thread wants to send this delivery out the link
+//
+static uint64_t _core_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+ uint64_t outcome = PN_RELEASED;
+
+ if (hconn) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] Core link deliver %p %s", hconn->conn_id, link->identity,
+ (void*)delivery, settled ? "settled" : "unsettled");
+
+ if (hconn->type == HTTP1_CONN_SERVER)
+ outcome = qdr_http1_server_core_link_deliver(qdr_http1_adaptor, hconn, link, delivery, settled);
+ else
+ outcome = qdr_http1_client_core_link_deliver(qdr_http1_adaptor, hconn, link, delivery, settled);
+ }
+
+ return outcome;
+}
+
+static int _core_link_get_credit(void *context, qdr_link_t *link)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_link_get_context(link);
+ int credit = 0;
+ if (hconn) {
+ credit = (link == hconn->in_link) ? hconn->in_link_credit : hconn->out_link_credit;
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] Link get credit (%d)", hconn->conn_id, link->identity, credit);
+ }
+
+ return credit;
+}
+
+
+// Handle disposition/settlement update for the outstanding incoming HTTP message.
+//
+static void _core_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
+{
+ qdr_http1_request_base_t *hreq = (qdr_http1_request_base_t*) qdr_delivery_get_context(dlv);
+ if (hreq) {
+ qdr_http1_connection_t *hconn = hreq->hconn;
+ qdr_link_t *link = qdr_delivery_link(dlv);
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] Core Delivery update disp=0x%"PRIx64" %s",
+ hconn->conn_id, link->identity, disp,
+ settled ? "settled" : "unsettled");
+
+ if (hconn->type == HTTP1_CONN_SERVER)
+ qdr_http1_server_core_delivery_update(qdr_http1_adaptor, hconn, hreq, dlv, disp, settled);
+ else
+ qdr_http1_client_core_delivery_update(qdr_http1_adaptor, hconn, hreq, dlv, disp, settled);
+ }
+}
+
+static void _core_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_connection_get_context(conn);
+ if (hconn) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"] HTTP/1.x closing connection", hconn->conn_id);
+
+ char *qdr_error = error ? qdr_error_description(error) : 0;
+ qdr_http1_close_connection(hconn, qdr_error);
+ qdr_connection_set_context(conn, 0);
+ hconn->qdr_conn = 0;
+ hconn->in_link = hconn->out_link = 0;
+ free(qdr_error);
+ }
+}
+
+
+static void _core_conn_trace(void *context, qdr_connection_t *conn, bool trace)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_connection_get_context(conn);
+ if (hconn) {
+ hconn->trace = trace;
+ if (trace)
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"] HTTP/1.x trace enabled", hconn->conn_id);
+ }
+}
+
+
+//
+// Adaptor Setup & Teardown
+//
+
+
+static void qd_http1_adaptor_init(qdr_core_t *core, void **adaptor_context)
+{
+ qdr_http1_adaptor_t *adaptor = NEW(qdr_http1_adaptor_t);
+
+ ZERO(adaptor);
+ adaptor->core = core;
+ adaptor->log = qd_log_source(QD_HTTP_LOG_SOURCE);
+ adaptor->lock = sys_mutex();
+ DEQ_INIT(adaptor->listeners);
+ DEQ_INIT(adaptor->connectors);
+ DEQ_INIT(adaptor->connections);
+ adaptor->adaptor = qdr_protocol_adaptor(core,
+ "http/1.x",
+ adaptor, // context
+ _core_connection_activate_CT, // core thread only
+ _core_link_first_attach,
+ _core_link_second_attach,
+ _core_link_detach,
+ _core_link_flow,
+ _core_link_offer,
+ _core_link_drained,
+ _core_link_drain,
+ _core_link_push,
+ _core_link_deliver,
+ _core_link_get_credit,
+ _core_delivery_update,
+ _core_conn_close,
+ _core_conn_trace);
+ *adaptor_context = adaptor;
+ qdr_http1_adaptor = adaptor;
+}
+
+
+static void qd_http1_adaptor_final(void *adaptor_context)
+{
+ qdr_http1_adaptor_t *adaptor = (qdr_http1_adaptor_t*) adaptor_context;
+ qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
+
+ qd_http_lsnr_t *li = DEQ_HEAD(adaptor->listeners);
+ while (li) {
+ qd_http1_delete_listener(0, li);
+ li = DEQ_HEAD(adaptor->listeners);
+ }
+ qd_http_connector_t *ct = DEQ_HEAD(adaptor->connectors);
+ while (ct) {
+ qd_http1_delete_connector(0, ct);
+ ct = DEQ_HEAD(adaptor->connectors);
+ }
+ qdr_http1_connection_t *hconn = DEQ_HEAD(adaptor->connections);
+ while (hconn) {
+ qdr_http1_connection_free(hconn);
+ hconn = DEQ_HEAD(adaptor->connections);
+ }
+
+ sys_mutex_free(adaptor->lock);
+ qdr_http1_adaptor = NULL;
+
+ free(adaptor);
+}
+
+
+/**
+ * Declare the adaptor so that it will self-register on process startup.
+ */
+QDR_CORE_ADAPTOR_DECLARE("http1.x-adaptor", qd_http1_adaptor_init, qd_http1_adaptor_final)
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
new file mode 100644
index 0000000..0a06787
--- /dev/null
+++ b/src/adaptors/http1/http1_client.c
@@ -0,0 +1,1306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "http1_private.h"
+#include "adaptors/adaptor_utils.h"
+
+#include <proton/listener.h>
+#include <proton/proactor.h>
+
+
+//
+// This file contains code specific to HTTP client processing. The raw
+// connection is terminated at an HTTP client, not an HTTP server.
+//
+
+#define DEFAULT_CAPACITY 250
+#define LISTENER_BACKLOG 16
+
+
+//
+// State for a single response message to be sent to the client via the raw
+// connection.
+//
+typedef struct _client_response_msg_t {
+ DEQ_LINKS(struct _client_response_msg_t);
+
+ qdr_delivery_t *dlv; // from core via core_link_deliver
+ uint64_t dispo; // set by adaptor on encode complete
+ bool headers_encoded; // all headers completely encoded
+ bool encoded; // true when full response encoded
+
+ // HTTP encoded message data
+ qdr_http1_out_data_fifo_t out_data;
+
+} _client_response_msg_t;
+ALLOC_DECLARE(_client_response_msg_t);
+ALLOC_DEFINE(_client_response_msg_t);
+DEQ_DECLARE(_client_response_msg_t, _client_response_msg_list_t);
+
+
+//
+// State for an HTTP/1.x Request+Response exchange, client facing
+//
+typedef struct _client_request_t {
+ qdr_http1_request_base_t base;
+
+ // The request arrives via the raw connection. These fields are used to
+ // build the message and deliver it into the core.
+ //
+ qd_message_t *request_msg; // holds inbound message as it is built
+ qdr_delivery_t *request_dlv; // qdr_link_deliver()
+ qd_composed_field_t *request_props; // holds HTTP headers as they arrive
+ uint64_t request_dispo; // set by core (core_update_delivery)
+ bool request_settled; // set by core (core_update_delivery)
+
+ // A single request may result in more than one response (1xx Continue for
+ // example). These responses are written to the raw connection from HEAD
+ // to TAIL.
+ //
+ _client_response_msg_list_t responses;
+
+ bool codec_completed; // encoder/decoder done
+ bool cancelled;
+ bool close_on_complete; // close the conn when this request is complete
+
+} _client_request_t;
+ALLOC_DECLARE(_client_request_t);
+ALLOC_DEFINE(_client_request_t);
+
+
+static void _client_tx_buffers_cb(h1_codec_request_state_t *lib_hrs, qd_buffer_list_t *blist, unsigned int len);
+static void _client_tx_body_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_body_data_t *body_data);
+static int _client_rx_request_cb(h1_codec_request_state_t *lib_rs,
+ const char *method,
+ const char *target,
+ uint32_t version_major,
+ uint32_t version_minor);
+static int _client_rx_response_cb(h1_codec_request_state_t *lib_rs,
+ int status_code,
+ const char *reason_phrase,
+ uint32_t version_major,
+ uint32_t version_minor);
+static int _client_rx_header_cb(h1_codec_request_state_t *lib_rs, const char *key, const char *value);
+static int _client_rx_headers_done_cb(h1_codec_request_state_t *lib_rs, bool has_body);
+static int _client_rx_body_cb(h1_codec_request_state_t *lib_rs, qd_buffer_list_t *body, size_t offset, uintmax_t len,
+ bool more);
+static void _client_rx_done_cb(h1_codec_request_state_t *lib_rs);
+static void _client_request_complete_cb(h1_codec_request_state_t *lib_rs, bool cancelled);
+static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context);
+static void _client_response_msg_free(_client_request_t *req, _client_response_msg_t *rmsg);
+static void _client_request_free(_client_request_t *req);
+static void _client_connection_free(qdr_http1_connection_t *hconn);
+static void _write_pending_response(_client_request_t *req);
+
+
+////////////////////////////////////////////////////////
+// HTTP/1.x Client Listener
+////////////////////////////////////////////////////////
+
+
+// Listener received connection request from client
+//
+static qdr_http1_connection_t *_create_client_connection(qd_http_lsnr_t *li)
+{
+ qdr_http1_connection_t *hconn = new_qdr_http1_connection_t();
+
+ ZERO(hconn);
+ hconn->type = HTTP1_CONN_CLIENT;
+ hconn->qd_server = li->server;
+ hconn->adaptor = qdr_http1_adaptor;
+ hconn->handler_context.handler = &_handle_connection_events;
+ hconn->handler_context.context = hconn;
+
+ hconn->client.next_msg_id = 99383939;
+
+ // configure the HTTP/1.x library
+
+ h1_codec_config_t config = {0};
+ config.type = HTTP1_CONN_CLIENT;
+ config.tx_buffers = _client_tx_buffers_cb;
+ config.tx_body_data = _client_tx_body_data_cb;
+ config.rx_request = _client_rx_request_cb;
+ config.rx_response = _client_rx_response_cb;
+ config.rx_header = _client_rx_header_cb;
+ config.rx_headers_done = _client_rx_headers_done_cb;
+ config.rx_body = _client_rx_body_cb;
+ config.rx_done = _client_rx_done_cb;
+ config.request_complete = _client_request_complete_cb;
+
+ hconn->http_conn = h1_codec_connection(&config, hconn);
+ if (!hconn->http_conn) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR,
+ "Failed to initialize HTTP/1.x library - connection refused.");
+ qdr_http1_connection_free(hconn);
+ return 0;
+ }
+
+ hconn->cfg.host = qd_strdup(li->config.host);
+ hconn->cfg.port = qd_strdup(li->config.port);
+ hconn->cfg.address = qd_strdup(li->config.address);
+
+ hconn->raw_conn = pn_raw_connection();
+ pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
+
+ sys_mutex_lock(qdr_http1_adaptor->lock);
+ DEQ_INSERT_TAIL(qdr_http1_adaptor->connections, hconn);
+ sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+ // we'll create a QDR connection and links once the raw connection activates
+ return hconn;
+}
+
+
+// Process proactor events for the client listener
+//
+static void _handle_listener_events(pn_event_t *e, qd_server_t *qd_server, void *context)
+{
+ qd_log_source_t *log = qdr_http1_adaptor->log;
+ qd_http_lsnr_t *li = (qd_http_lsnr_t*) context;
+ const char *host_port = li->config.host_port;
+
+ qd_log(log, QD_LOG_DEBUG, "HTTP/1.x Client Listener Event %s\n", pn_event_type_name(pn_event_type(e)));
+
+ switch (pn_event_type(e)) {
+
+ case PN_LISTENER_OPEN: {
+ qd_log(log, QD_LOG_NOTICE, "Listening for HTTP/1.x client requests on %s", host_port);
+ break;
+ }
+
+ case PN_LISTENER_ACCEPT: {
+ qd_log(log, QD_LOG_INFO, "Accepting HTTP/1.x connection on %s", host_port);
+ qdr_http1_connection_t *hconn = _create_client_connection(li);
+ if (hconn) {
+ // Note: the proactor may schedule the hconn on another thread
+ // during this call!
+ pn_listener_raw_accept(li->pn_listener, hconn->raw_conn);
+ }
+ break;
+ }
+
+ case PN_LISTENER_CLOSE: {
+ if (li->pn_listener) {
+ pn_condition_t *cond = pn_listener_condition(li->pn_listener);
+ if (pn_condition_is_set(cond)) {
+ qd_log(log, QD_LOG_ERROR, "Listener error on %s: %s (%s)", host_port,
+ pn_condition_get_description(cond),
+ pn_condition_get_name(cond));
+ } else {
+ qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port);
+ }
+ pn_listener_set_context(li->pn_listener, 0);
+ li->pn_listener = 0;
+ }
+ break;
+ }
+
+ default:
+ break;
+ }
+}
+
+
+// Management Agent API - Create
+//
+qd_http_lsnr_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
+{
+ qd_http_lsnr_t *li = qd_http_lsnr(qd->server, &_handle_listener_events);
+ if (!li) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR, "Unable to create http listener: no memory");
+ return 0;
+ }
+ li->config = *config;
+
+ DEQ_ITEM_INIT(li);
+
+ sys_mutex_lock(qdr_http1_adaptor->lock);
+ DEQ_INSERT_TAIL(qdr_http1_adaptor->listeners, li);
+ sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Configured HTTP_ADAPTOR listener on %s", (&li->config)->host_port);
+ // Note: the proactor may schedule the pn_listener on another thread during this call
+ pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, li->config.host_port, LISTENER_BACKLOG);
+ return li;
+}
+
+
+// Management Agent API - Delete
+//
+void qd_http1_delete_listener(qd_dispatch_t *ignore, qd_http_lsnr_t *li)
+{
+ if (li) {
+ if (li->pn_listener) {
+ pn_listener_close(li->pn_listener);
+ li->pn_listener = 0;
+ }
+ sys_mutex_lock(qdr_http1_adaptor->lock);
+ DEQ_REMOVE(qdr_http1_adaptor->listeners, li);
+ sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Deleted HttpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port);
+ qd_http_listener_decref(li);
+ }
+}
+
+
+////////////////////////////////////////////////////////
+// Raw Connector Events
+////////////////////////////////////////////////////////
+
+
+// Raw Connection Initialization
+//
+static void _setup_client_connection(qdr_http1_connection_t *hconn)
+{
+ hconn->client.client_ip_addr = qda_raw_conn_get_address(hconn->raw_conn);
+ qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted,
+ false, //bool is_authenticated,
+ true, //bool opened,
+ "", //char *sasl_mechanisms,
+ QD_INCOMING, //qd_direction_t dir,
+ hconn->client.client_ip_addr, //const char *host,
+ "", //const char *ssl_proto,
+ "", //const char *ssl_cipher,
+ "", //const char *user,
+ "HTTP/1.x Adaptor", //const char *container,
+ pn_data(0), //pn_data_t *connection_properties,
+ 0, //int ssl_ssf,
+ false, //bool ssl,
+ // set if remote is a qdrouter
+ 0); //const qdr_router_version_t *version)
+
+ hconn->conn_id = qd_server_allocate_connection_id(hconn->qd_server);
+ hconn->qdr_conn = qdr_connection_opened(qdr_http1_adaptor->core,
+ qdr_http1_adaptor->adaptor,
+ true, // incoming
+ QDR_ROLE_NORMAL,
+ 1, //cost
+ hconn->conn_id,
+ 0, // label
+ 0, // remote container id
+ false, // strip annotations in
+ false, // strip annotations out
+ false, // allow dynamic link routes
+ false, // allow admin status update
+ DEFAULT_CAPACITY,
+ 0, // vhost
+ info,
+ 0, // bind context
+ 0); // bind token
+ qdr_connection_set_context(hconn->qdr_conn, hconn);
+
+ qd_log(hconn->adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP connection to client created", hconn->conn_id);
+
+ // simulate a client subscription for reply-to
+ qdr_terminus_t *dynamic_source = qdr_terminus(0);
+ qdr_terminus_set_dynamic(dynamic_source);
+ hconn->out_link = qdr_link_first_attach(hconn->qdr_conn,
+ QD_OUTGOING,
+ dynamic_source, //qdr_terminus_t *source,
+ qdr_terminus(0), //qdr_terminus_t *target,
+ "http1.client.reply-to", //const char *name,
+ 0, //const char *terminus_addr,
+ false, // no-route
+ NULL, // initial delivery
+ &(hconn->out_link_id));
+ qdr_link_set_context(hconn->out_link, hconn);
+
+ qd_log(hconn->adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP client response link created",
+ hconn->conn_id, hconn->out_link_id);
+
+ // simulate a client publisher link to the HTTP server:
+ qdr_terminus_t *target = qdr_terminus(0);
+ qdr_terminus_set_address(target, hconn->cfg.address);
+ hconn->in_link = qdr_link_first_attach(hconn->qdr_conn,
+ QD_INCOMING,
+ qdr_terminus(0), //qdr_terminus_t *source,
+ target, //qdr_terminus_t *target,
+ "http1.client.in", //const char *name,
+ 0, //const char *terminus_addr,
+ false,
+ NULL,
+ &(hconn->in_link_id));
+ qdr_link_set_context(hconn->in_link, hconn);
+
+ qd_log(hconn->adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP client request link created",
+ hconn->conn_id, hconn->in_link_id);
+
+ // wait until the dynamic reply-to address is returned in the second attach
+ // to grant buffers to the raw connection
+}
+
+
+// Proton Connection Event Handler
+//
+static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
+ qd_log_source_t *log = qdr_http1_adaptor->log;
+
+ qd_log(log, QD_LOG_DEBUG, "RAW CONNECTION EVENT %s\n", pn_event_type_name(pn_event_type(e)));
+
+ if (!hconn) return;
+
+ switch (pn_event_type(e)) {
+
+ case PN_RAW_CONNECTION_CONNECTED: {
+ _setup_client_connection(hconn);
+ break;
+ }
+ case PN_RAW_CONNECTION_CLOSED_READ:
+ case PN_RAW_CONNECTION_CLOSED_WRITE: {
+ qd_log(log, QD_LOG_DEBUG, "[C%i] Closed for %s", hconn->conn_id,
+ pn_event_type(e) == PN_RAW_CONNECTION_CLOSED_READ
+ ? "reading" : "writing");
+ pn_raw_connection_close(hconn->raw_conn);
+ break;
+ }
+ case PN_RAW_CONNECTION_DISCONNECTED: {
+ qd_log(log, QD_LOG_INFO, "[C%i] Disconnected", hconn->conn_id);
+ pn_raw_connection_set_context(hconn->raw_conn, 0);
+ hconn->raw_conn = 0;
+
+ if (hconn->out_link) {
+ qdr_link_set_context(hconn->out_link, 0);
+ qdr_link_detach(hconn->out_link, QD_CLOSED, 0);
+ hconn->out_link = 0;
+ }
+ if (hconn->in_link) {
+ qdr_link_set_context(hconn->in_link, 0);
+ qdr_link_detach(hconn->in_link, QD_CLOSED, 0);
+ hconn->in_link = 0;
+ }
+ if (hconn->qdr_conn) {
+ qdr_connection_set_context(hconn->qdr_conn, 0);
+ qdr_connection_closed(hconn->qdr_conn);
+ hconn->qdr_conn = 0;
+ }
+
+ _client_connection_free(hconn);
+ return; // hconn no longer valid
+ }
+ case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
+ qd_log(log, QD_LOG_DEBUG, "[C%i] Need write buffers", hconn->conn_id);
+ _write_pending_response((_client_request_t*) DEQ_HEAD(hconn->requests));
+ break;
+ }
+ case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", hconn->conn_id);
+ // @TODO(kgiusti): backpressure if no credit
+ if (hconn->client.reply_to_addr && !hconn->close_connection /* && hconn->in_link_credit > 0 */) {
+ int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
+ hconn->conn_id, granted);
+ }
+ break;
+ }
+ case PN_RAW_CONNECTION_WAKE: {
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Wake-up", hconn->conn_id);
+ while (qdr_connection_process(hconn->qdr_conn)) {}
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Processing done", hconn->conn_id);
+ break;
+ }
+ case PN_RAW_CONNECTION_READ: {
+ qd_buffer_list_t blist;
+ uintmax_t length;
+ qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
+ if (length) {
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from client",
+ hconn->conn_id, hconn->in_link_id, length);
+ hconn->in_http1_octets += length;
+ int error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
+ if (error)
+ qdr_http1_close_connection(hconn, "Incoming request message failed to parse");
+ }
+ break;
+ }
+ case PN_RAW_CONNECTION_WRITTEN: {
+ qdr_http1_free_written_buffers(hconn);
+ break;
+ }
+ default:
+ break;
+ }
+
+ // Check the head request for completion and advance to next request if
+ // done.
+
+ // remove me:
+ if (hconn) {
+ _client_request_t *hreq = (_client_request_t*) DEQ_HEAD(hconn->requests);
+ if (hreq) {
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP is client request complete????", hconn->conn_id);
+ qd_log(log, QD_LOG_DEBUG, " codec=%s req-dlv=%p resp-dlv=%d req_msg=%p %s",
+ hreq->codec_completed ? "Done" : "Not Done",
+ (void*)hreq->request_dlv,
+ (int)DEQ_SIZE(hreq->responses),
+ (void*)hreq->request_msg,
+ hreq->cancelled ? "Cancelled" : "Not Cancelled");
+ }
+ }
+
+ // check if the head request is done
+
+ bool need_close = false;
+ _client_request_t *hreq = (_client_request_t *)(hconn ? DEQ_HEAD(hconn->requests) : 0);
+ if (hreq) {
+ // Can we retire the current outgoing response messages?
+ _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+ while (rmsg &&
+ rmsg->encoded &&
+ DEQ_IS_EMPTY(rmsg->out_data.fifo)) {
+ // encoded ==> entire message received from core and
+ // dispo and settlement updated to core.
+ // empty out_data ==> all outstanding raw buffers sent
+ _client_response_msg_free(hreq, rmsg);
+ rmsg = DEQ_HEAD(hreq->responses);
+ }
+
+ if (hreq->codec_completed &&
+ DEQ_IS_EMPTY(hreq->responses) &&
+ hreq->request_settled) {
+
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request completed!", hconn->conn_id);
+
+ need_close = hreq->close_on_complete;
+ _client_request_free(hreq);
+ }
+ }
+
+ if (need_close)
+ qdr_http1_close_connection(hconn, "Connection: close");
+ else {
+ hreq = (_client_request_t*) DEQ_HEAD(hconn->requests);
+ if (hreq) {
+
+ if (hreq->request_msg && hconn->in_link_credit > 0) {
+
+ qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Delivering request to router",
+ hconn->conn_id, hconn->in_link_id);
+
+ hconn->in_link_credit -= 1;
+ hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0);
+ qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
+ qdr_delivery_incref(hreq->request_dlv, "referenced by HTTP1 adaptor");
+ hreq->request_msg = 0;
+ }
+
+ _write_pending_response(hreq);
+ }
+ }
+}
+
+
+
+
+////////////////////////////////////////////////////////
+// HTTP/1.x Encoder/Decoder Callbacks
+////////////////////////////////////////////////////////
+
+
+// Encoder callback: send blist buffers (response msg) to client endpoint
+//
+static void _client_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *blist, unsigned int len)
+{
+ _client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+ if (!hconn->raw_conn) {
+ // client connection has been lost
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%i] Discarding outgoing data - client connection closed", hconn->conn_id);
+ qd_buffer_list_free_buffers(blist);
+ return;
+ }
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] %u request octets encoded",
+ hconn->conn_id, hconn->out_link_id, len);
+
+ // responses are decoded one at a time - the current response it at the
+ // tail of the response list
+
+ _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
+ assert(rmsg);
+ qdr_http1_enqueue_buffer_list(&rmsg->out_data, blist);
+
+ // if this happens to be the current outgoing response try writing to the
+ // raw connection
+
+ if (rmsg == DEQ_HEAD(hreq->responses))
+ _write_pending_response(hreq);
+}
+
+
+// Encoder callback: send body_data buffers (response msg) to client endpoint
+//
+static void _client_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data)
+{
+ _client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+ if (!hconn->raw_conn) {
+ // client connection has been lost
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%i] Discarding outgoing data - client connection closed", hconn->conn_id);
+ qd_message_body_data_release(body_data);
+ return;
+ }
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Sending body data to client",
+ hconn->conn_id, hconn->out_link_id);
+
+ // responses are decoded one at a time - the current response it at the
+ // tail of the response list
+
+ _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
+ assert(rmsg);
+ qdr_http1_enqueue_body_data(&rmsg->out_data, body_data);
+
+ // if this happens to be the current outgoing response try writing to the
+ // raw connection
+
+ if (rmsg == DEQ_HEAD(hreq->responses))
+ _write_pending_response(hreq);
+}
+
+
+// Called when decoding an HTTP request from a client. This indicates the
+// start of a new request message.
+//
+static int _client_rx_request_cb(h1_codec_request_state_t *hrs,
+ const char *method,
+ const char *target,
+ uint32_t version_major,
+ uint32_t version_minor)
+{
+ h1_codec_connection_t *h1c = h1_codec_request_state_get_connection(hrs);
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*)h1_codec_connection_get_context(h1c);
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"] HTTP request received: method=%s target=%s version=%"PRIi32".%"PRIi32,
+ hconn->conn_id, method, target, version_major, version_minor);
+
+ _client_request_t *creq = new__client_request_t();
+ ZERO(creq);
+ creq->base.msg_id = hconn->client.next_msg_id++;
+ creq->base.lib_rs = hrs;
+ creq->base.hconn = hconn;
+ DEQ_INIT(creq->responses);
+
+ creq->request_props = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
+ qd_compose_start_map(creq->request_props);
+ {
+ // OASIS specifies this value as "1.1" by default...
+ char version[64];
+ snprintf(version, 64, "%"PRIi32".%"PRIi32, version_major, version_minor);
+ qd_compose_insert_symbol(creq->request_props, REQUEST_HEADER_KEY);
+ qd_compose_insert_string(creq->request_props, version);
+
+ qd_compose_insert_symbol(creq->request_props, TARGET_HEADER_KEY);
+ qd_compose_insert_string(creq->request_props, target);
+ }
+
+ h1_codec_request_state_set_context(hrs, (void*) creq);
+ DEQ_INSERT_TAIL(hconn->requests, &creq->base);
+ return 0;
+}
+
+
+// Cannot happen for a client connection!
+static int _client_rx_response_cb(h1_codec_request_state_t *hrs,
+ int status_code,
+ const char *reason_phrase,
+ uint32_t version_major,
+ uint32_t version_minor)
+{
+ _client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR,
+ "[C%"PRIu64"][L%"PRIu64"] Spurious HTTP response received from client",
+ hconn->conn_id, hconn->in_link_id);
+ return HTTP1_STATUS_BAD_REQ;
+}
+
+
+// called for each decoded HTTP header.
+//
+static int _client_rx_header_cb(h1_codec_request_state_t *hrs, const char *key, const char *value)
+{
+ _client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP request header received: key='%s' value='%s'",
+ hconn->conn_id, hconn->in_link_id, key, value);
+
+ if (strcasecmp(key, "connection") == 0) {
+ // We need to filter the connection header out. See if client
+ // requested 'close' - this means it expects us to close the connection
+ // when the response has been sent
+ //
+ // @TODO(kgiusti): also have to remove other headers given in value!
+ //
+ size_t len;
+ const char *token = qdr_http1_token_list_next(value, &len, &value);
+ while (token) {
+ if (len == 5 && strncasecmp(token, "close", 5) == 0) {
+ hreq->close_on_complete = true;
+ break;
+ }
+ token = qdr_http1_token_list_next(value, &len, &value);
+ }
+
+ } else {
+ qd_compose_insert_symbol(hreq->request_props, key);
+ qd_compose_insert_string(hreq->request_props, value);
+ }
+
+ return 0;
+}
+
+
+// Called after the last header is decoded, before decoding any body data.
+// At this point there is enough data to start forwarding the message to
+// the router.
+//
+static int _client_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_body)
+{
+ _client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP request headers done.",
+ hconn->conn_id, hconn->in_link_id);
+
+ // now that all the headers have been received we can construct
+ // the AMQP message
+
+ hreq->request_msg = qd_message();
+
+ qd_composed_field_t *hdrs = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+ qd_compose_start_list(hdrs);
+ qd_compose_insert_bool(hdrs, 0); // durable
+ qd_compose_insert_null(hdrs); // priority
+ //qd_compose_insert_null(hdrs); // ttl
+ //qd_compose_insert_bool(hdrs, 0); // first-acquirer
+ //qd_compose_insert_uint(hdrs, 0); // delivery-count
+ qd_compose_end_list(hdrs);
+
+ qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, hdrs);
+ qd_compose_start_list(props);
+
+ qd_compose_insert_ulong(props, hreq->base.msg_id); // message-id
+ qd_compose_insert_null(props); // user-id
+ // @TODO(kgiusti) set to: to target?
+ qd_compose_insert_string(props, hconn->cfg.address); // to
+ qd_compose_insert_string(props, h1_codec_request_state_method(hrs)); // subject
+ qd_compose_insert_string(props, hconn->client.reply_to_addr); // reply-to
+ qd_compose_end_list(props);
+
+ qd_compose_end_map(hreq->request_props);
+
+ if (!has_body) {
+ // @TODO(kgiusti): fixme: tack on an empty body data performative. The
+ // message decoder will barf otherwise
+ qd_buffer_list_t empty = DEQ_EMPTY;
+ hreq->request_props = qd_compose(QD_PERFORMATIVE_BODY_DATA, hreq->request_props);
+ qd_compose_insert_binary_buffers(hreq->request_props, &empty);
+ }
+
+ qd_message_compose_3(hreq->request_msg, props, hreq->request_props, !has_body);
+ qd_compose_free(props);
+ qd_compose_free(hreq->request_props);
+ hreq->request_props = 0;
+
+ // Use up one credit to obtain a delivery and forward to core. If no
+ // credit is available the request is stalled until the core grants more
+ // flow.
+ if (hreq == (_client_request_t*) DEQ_HEAD(hconn->requests) && hconn->in_link_credit > 0) {
+ hconn->in_link_credit -= 1;
+
+ qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Delivering request to router",
+ hconn->conn_id, hconn->in_link_id);
+
+ hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0);
+ qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
+ qdr_delivery_incref(hreq->request_dlv, "referenced by HTTP1 adaptor");
+ hreq->request_msg = 0;
+ }
+
+ return 0;
+}
+
+
+// Called with decoded body data. This may be called multiple times as body
+// data becomes available.
+//
+static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t offset, size_t len,
+ bool more)
+{
+ _client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+ qd_message_t *msg = hreq->request_msg ? hreq->request_msg : qdr_delivery_message(hreq->request_dlv);
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.",
+ hconn->conn_id, hconn->in_link_id, len);
+
+ if (offset) {
+ // dispatch assumes all body data starts at the buffer base so it cannot deal with offsets.
+ // Remove the offset by shifting the content of the head buffer forward
+ //
+ qd_buffer_t *head = DEQ_HEAD(*body);
+ memmove(qd_buffer_base(head), qd_buffer_base(head) + offset, qd_buffer_size(head) - offset);
+ head->size -= offset;
+ }
+
+ //
+ // Compose a DATA performative for this section of the stream
+ //
+ qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+ qd_compose_insert_binary_buffers(field, body);
+
+ //
+ // Extend the streaming message and free the composed field
+ //
+ qd_message_extend(msg, field);
+ qd_compose_free(field);
+
+ //
+ // Notify the router that more data is ready to be pushed out on the delivery
+ //
+ if (!more)
+ qd_message_set_receive_complete(msg);
+
+ if (hreq->request_dlv)
+ qdr_delivery_continue(qdr_http1_adaptor->core, hreq->request_dlv, false);
+
+ return 0;
+}
+
+
+// Called at the completion of request message decoding.
+//
+static void _client_rx_done_cb(h1_codec_request_state_t *hrs)
+{
+ _client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+ qd_message_t *msg = hreq->request_msg ? hreq->request_msg : qdr_delivery_message(hreq->request_dlv);
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP request receive complete.",
+ hconn->conn_id, hconn->in_link_id);
+
+ if (!qd_message_receive_complete(msg)) {
+ qd_message_set_receive_complete(msg);
+ if (hreq->request_dlv) {
+ qdr_delivery_continue(qdr_http1_adaptor->core, hreq->request_dlv, false);
+ }
+ }
+}
+
+
+// The coded has completed processing the request and response messages.
+//
+static void _client_request_complete_cb(h1_codec_request_state_t *lib_rs, bool cancelled)
+{
+ _client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(lib_rs);
+ if (hreq) {
+ hreq->base.lib_rs = 0; // freed on return from this call
+ hreq->cancelled = hreq->cancelled || cancelled;
+ hreq->codec_completed = !hreq->cancelled;
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"] HTTP request/response %s.", hreq->base.hconn->conn_id,
+ cancelled ? "cancelled!" : "codec done");
+ }
+}
+
+
+//////////////////////////////////////////////////////////////////////
+// Router Protocol Adapter Callbacks
+//////////////////////////////////////////////////////////////////////
+
+
+void qdr_http1_client_core_second_attach(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ qdr_link_t *link,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target)
+{
+ if (link == hconn->out_link) {
+ // this is the reply-to link for the client
+ qd_iterator_t *reply_iter = qdr_terminus_get_address(source);
+ hconn->client.reply_to_addr = (char*) qd_iterator_copy(reply_iter);
+
+ assert(hconn->client.reply_to_addr);
+
+ hconn->out_link_credit += DEFAULT_CAPACITY;
+ qdr_link_flow(adaptor->core, link, DEFAULT_CAPACITY, false);
+ }
+}
+
+
+void qdr_http1_client_core_link_flow(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ qdr_link_t *link,
+ int credit)
+{
+ qd_log(adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] Credit granted on request link %d",
+ hconn->conn_id, hconn->in_link_id, credit);
+
+ assert(link == hconn->in_link); // router only grants flow on incoming link
+
+ hconn->in_link_credit += credit;
+ if (hconn->in_link_credit > 0) {
+
+ if (hconn->raw_conn) {
+ int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+ qd_log(adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"] %d read buffers granted",
+ hconn->conn_id, granted);
+ }
+
+ // is the current request message blocked by lack of credit?
+
+ _client_request_t *hreq = (_client_request_t *)DEQ_HEAD(hconn->requests);
+ if (hreq && hreq->request_msg) {
+ assert(!hreq->request_dlv);
+ hconn->in_link_credit -= 1;
+
+ qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Delivering request to router",
+ hconn->conn_id, hconn->in_link_id);
+
+ hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0);
+ qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
+ qdr_delivery_incref(hreq->request_dlv, "referenced by HTTP1 adaptor");
+ hreq->request_msg = 0;
+ }
+ }
+}
+
+
+// Handle disposition/settlement update for the outstanding request msg
+//
+void qdr_http1_client_core_delivery_update(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ qdr_http1_request_base_t *req,
+ qdr_delivery_t *dlv,
+ uint64_t disp,
+ bool settled)
+{
+ _client_request_t *hreq = (_client_request_t *)req;
+ assert(dlv == hreq->request_dlv);
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP request delivery update, outcome=0x%"PRIx64"%s",
+ hconn->conn_id, hconn->in_link_id, disp, settled ? " settled" : "");
+
+ if (disp && disp != PN_RECEIVED && hreq->request_dispo == 0) {
+ // terminal disposition
+ hreq->request_dispo = disp;
+ if (disp != PN_ACCEPTED) {
+ // no response message is going to arrive. Now what? For now fake
+ // a response from the server by using the codec to write an error
+ // response on the behalf of the server.
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP request failure, outcome=0x%"PRIx64,
+ hconn->conn_id, hconn->in_link_id, disp);
+
+ _client_response_msg_t *rmsg = new__client_response_msg_t();
+ ZERO(rmsg);
+ DEQ_INIT(rmsg->out_data.fifo);
+ DEQ_INSERT_TAIL(hreq->responses, rmsg);
+
+ if (disp == PN_REJECTED) {
+ qdr_http1_error_response(&hreq->base, 400, "Bad Request");
+ } else {
+ // total guess as to what the proper error code should be
+ qdr_http1_error_response(&hreq->base, 503, "Service Unavailable");
+ }
+ }
+ }
+
+ hreq->request_settled = settled || hreq->request_settled;
+}
+
+
+//
+// Response message forwarding
+//
+
+
+// use the correlation ID from the AMQP message containing the response to look
+// up the original request context
+//
+static _client_request_t *_lookup_request_context(qdr_http1_connection_t *hconn,
+ qd_message_t *msg)
+{
+ qdr_http1_request_base_t *req = 0;
+
+ qd_parsed_field_t *cid_pf = 0;
+ qd_iterator_t *cid_iter = qd_message_field_iterator_typed(msg, QD_FIELD_CORRELATION_ID);
+ if (cid_iter) {
+ cid_pf = qd_parse(cid_iter);
+ if (cid_pf && qd_parse_ok(cid_pf)) {
+ uint64_t cid = qd_parse_as_ulong(cid_pf);
+ if (qd_parse_ok(cid_pf)) {
+ req = DEQ_HEAD(hconn->requests);
+ while (req) {
+ if (req->msg_id == cid)
+ break;
+ req = DEQ_NEXT(req);
+ }
+ }
+ }
+ }
+
+ qd_parse_free(cid_pf);
+ qd_iterator_free(cid_iter);
+
+ return (_client_request_t*) req;
+}
+
+
+// Encode the response status and all HTTP headers.
+// The message has been validated to app properties depth
+//
+static bool _encode_response_headers(_client_request_t *hreq,
+ _client_response_msg_t *rmsg)
+{
+ bool ok = false;
+ qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
+ qd_iterator_t *app_props_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
+ if (app_props_iter) {
+ qd_parsed_field_t *app_props = qd_parse(app_props_iter);
+ if (app_props && qd_parse_is_map(app_props)) {
+ qd_parsed_field_t *tmp = qd_parse_value_by_key(app_props, STATUS_HEADER_KEY);
+ if (tmp) {
+ int32_t status_code = qd_parse_as_int(tmp);
+ if (qd_parse_ok(tmp)) {
+
+ // the value for RESPONSE_HEADER_KEY is optional and is set
+ // to a string representation of the version of the server
+ // (e.g. "1.1"
+ uint32_t major = 1;
+ uint32_t minor = 1;
+ tmp = qd_parse_value_by_key(app_props, RESPONSE_HEADER_KEY);
+ if (tmp) {
+ char *version_str = (char*) qd_iterator_copy(qd_parse_raw(tmp));
+ if (version_str) {
+ sscanf(version_str, "%"SCNu32".%"SCNu32, &major, &minor);
+ free(version_str);
+ }
+ }
+ char *reason_str = 0;
+ tmp = qd_parse_value_by_key(app_props, REASON_HEADER_KEY);
+ if (tmp) {
+ reason_str = (char*) qd_iterator_copy(qd_parse_raw(tmp));
+ }
+
+ qd_log(hreq->base.hconn->adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Encoding response %d %s",
+ hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id, (int)status_code,
+ reason_str ? reason_str : "");
+
+ ok = !h1_codec_tx_response(hreq->base.lib_rs, (int)status_code, reason_str, major, minor);
+ free(reason_str);
+
+ // now send all headers in app properties
+ qd_parsed_field_t *key = qd_field_first_child(app_props);
+ while (ok && key) {
+ qd_parsed_field_t *value = qd_field_next_child(key);
+ if (!value)
+ break;
+
+ qd_iterator_t *i_key = qd_parse_raw(key);
+ if (!i_key)
+ break;
+
+ // ignore the special headers added by the mapping
+ if (!qd_iterator_prefix(i_key, HTTP1_HEADER_PREFIX)) {
+ qd_iterator_t *i_value = qd_parse_raw(value);
+ if (!i_value)
+ break;
+
+ char *header_key = (char*) qd_iterator_copy(i_key);
+ char *header_value = (char*) qd_iterator_copy(i_value);
+
+
+ qd_log(hreq->base.hconn->adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Encoding response header %s:%s",
+ hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id,
+ header_key, header_value);
+
+ ok = !h1_codec_tx_add_header(hreq->base.lib_rs, header_key, header_value);
+
+ free(header_key);
+ free(header_value);
+ }
+
+ key = qd_field_next_child(value);
+ }
+ }
+ }
+ }
+ qd_parse_free(app_props);
+ qd_iterator_free(app_props_iter);
+ }
+
+ return ok;
+}
+
+
+static uint64_t _encode_response_message(_client_request_t *hreq,
+ _client_response_msg_t *rmsg)
+{
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+ qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
+ qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_BODY);
+
+ if (status == QD_MESSAGE_DEPTH_INCOMPLETE)
+ return 0;
+
+ if (status == QD_MESSAGE_DEPTH_INVALID) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] body data depth check failed",
+ hconn->conn_id, hconn->out_link_id);
+ return PN_REJECTED;
+ }
+
+ assert(status == QD_MESSAGE_DEPTH_OK);
+
+ if (!rmsg->headers_encoded) {
+ rmsg->headers_encoded = true;
+ if (!_encode_response_headers(hreq, rmsg)) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] message headers malformed - discarding.",
+ hconn->conn_id, hconn->out_link_id);
+ return PN_REJECTED;
+ }
+ }
+
+ qd_message_body_data_t *body_data = 0;
+
+ while (true) {
+ switch (qd_message_next_body_data(msg, &body_data)) {
+
+ case QD_MESSAGE_BODY_DATA_OK:
+
+ qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Encoding response body data",
+ hconn->conn_id, hconn->out_link_id);
+
+ if (h1_codec_tx_body(hreq->base.lib_rs, body_data)) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] body data encode failed",
+ hconn->conn_id, hconn->out_link_id);
+ return PN_REJECTED;
+ }
+ break;
+
+ case QD_MESSAGE_BODY_DATA_NO_MORE:
+ // indicate this message is complete
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] response message encoding completed",
+ hconn->conn_id, hconn->out_link_id);
+ return PN_ACCEPTED;
+
+ case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+ return 0; // wait for more
+
+ case QD_MESSAGE_BODY_DATA_INVALID:
+ case QD_MESSAGE_BODY_DATA_NOT_DATA:
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] Rejecting corrupted body data.",
+ hconn->conn_id, hconn->out_link_id);
+ return PN_REJECTED;
+ }
+ }
+}
+
+
+// The I/O thread wants to send this delivery containing the response out the
+// link. It is unlikely that the parsing of this message will fail since the
+// message was constructed by the ingress router. However if the message fails
+// to parse then there is probably no recovering as the client will now be out
+// of sync. For now close the connection if an error occurs.
+//
+uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ qdr_link_t *link,
+ qdr_delivery_t *delivery,
+ bool settled)
+{
+ qd_message_t *msg = qdr_delivery_message(delivery);
+ _client_request_t *hreq = (_client_request_t*) qdr_delivery_get_context(delivery);
+ if (!hreq) {
+ // new delivery - look for corresponding request via correlation_id
+ switch (qd_message_check_depth(msg, QD_DEPTH_PROPERTIES)) {
+ case QD_MESSAGE_DEPTH_INCOMPLETE:
+ return 0;
+
+ case QD_MESSAGE_DEPTH_INVALID:
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] Malformed HTTP/1.x message",
+ hconn->conn_id, link->identity);
+ qd_message_set_send_complete(msg);
+ qdr_http1_close_connection(hconn, "Malformed response message");
+ return PN_REJECTED;
+
+ case QD_MESSAGE_DEPTH_OK:
+ hreq = _lookup_request_context(hconn, msg);
+ if (!hreq) {
+ // No corresponding request found
+ // @TODO(kgiusti) how to handle this? - simply discard?
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] Discarding malformed message.", hconn->conn_id, link->identity);
+ qd_message_set_send_complete(msg);
+ qdr_http1_close_connection(hconn, "Cannot correlate response message");
+ return PN_REJECTED;
+ }
+
+ // link request state and delivery
+ _client_response_msg_t *rmsg = new__client_response_msg_t();
+ ZERO(rmsg);
+ rmsg->dlv = delivery;
+ DEQ_INIT(rmsg->out_data.fifo);
+ qdr_delivery_set_context(delivery, hreq);
+ qdr_delivery_incref(delivery, "referenced by HTTP1 adaptor");
+ DEQ_INSERT_TAIL(hreq->responses, rmsg);
+ break;
+ }
+ }
+
+ // deliveries arrive one at a time and are added to the tail
+ _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
+ assert(rmsg && rmsg->dlv == delivery);
+
+ if (!rmsg->dispo)
+ rmsg->dispo = _encode_response_message(hreq, rmsg);
+
+ if (rmsg->dispo && qd_message_receive_complete(msg)) { // encode of message complete
+ rmsg->encoded = true;
+ qd_message_set_send_complete(msg);
+ qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP client settling response, dispo=0x%"PRIx64,
+ hconn->conn_id, hconn->out_link_id, rmsg->dispo);
+
+ qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+ rmsg->dlv,
+ rmsg->dispo,
+ true, // settled,
+ 0, // error
+ 0, // dispo data
+ false);
+ if (rmsg->dispo == PN_ACCEPTED) {
+ bool need_close = false;
+ h1_codec_tx_done(hreq->base.lib_rs, &need_close);
+ hreq->close_on_complete = need_close || hreq->close_on_complete;
+ } else {
+ // The response was bad. There's not much that can be done to
+ // recover, so for now I punt...
+ qdr_http1_close_connection(hconn, "Cannot parse response message");
+ }
+ }
+
+ return 0;
+}
+
+
+//
+// Misc
+//
+
+
+// free the response message
+//
+static void _client_response_msg_free(_client_request_t *req, _client_response_msg_t *rmsg)
+{
+ DEQ_REMOVE(req->responses, rmsg);
+ if (rmsg->dlv) {
+ qdr_delivery_set_context(rmsg->dlv, 0);
+ qdr_delivery_decref(qdr_http1_adaptor->core, rmsg->dlv, "HTTP1 adaptor response settled");
+ }
+
+ qdr_http1_out_data_fifo_cleanup(&rmsg->out_data);
+
+ free__client_response_msg_t(rmsg);
+}
+
+
+// Check the head response message for buffers that need to be sent
+//
+static void _write_pending_response(_client_request_t *hreq)
+{
+ if (hreq && !hreq->cancelled && !hreq->base.hconn->close_connection) {
+ assert(DEQ_PREV(&hreq->base) == 0); // must preserve order
+ _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+ if (rmsg && rmsg->out_data.write_ptr) {
+ uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &rmsg->out_data);
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%i] %"PRIu64" octets written",
+ hreq->base.hconn->conn_id, written);
+ }
+ }
+}
+
+
+static void _client_request_free(_client_request_t *hreq)
+{
+ if (hreq) {
+ qdr_http1_request_base_cleanup(&hreq->base);
+ qd_message_free(hreq->request_msg);
+ if (hreq->request_dlv) {
+ qdr_delivery_set_context(hreq->request_dlv, 0);
+ qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request settled");
+ }
+ qd_compose_free(hreq->request_props);
+
+ _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+ while (rmsg) {
+ _client_response_msg_free(hreq, rmsg);
+ rmsg = DEQ_HEAD(hreq->responses);
+ }
+
+ free__client_request_t(hreq);
+ }
+}
+
+
+static void _client_connection_free(qdr_http1_connection_t *hconn)
+{
+ for (_client_request_t *hreq = (_client_request_t*) DEQ_HEAD(hconn->requests);
+ hreq;
+ hreq = (_client_request_t*) DEQ_HEAD(hconn->requests)) {
+ _client_request_free(hreq);
+ }
+ qdr_http1_connection_free(hconn);
+}
diff --git a/src/adaptors/http1/http1_lib.c b/src/adaptors/http1/http1_codec.c
similarity index 66%
rename from src/adaptors/http1/http1_lib.c
rename to src/adaptors/http1/http1_codec.c
index 102a1aa..bec96fa 100644
--- a/src/adaptors/http1/http1_lib.c
+++ b/src/adaptors/http1/http1_codec.c
@@ -18,7 +18,7 @@
*
*/
-#include <qpid/dispatch/http1_lib.h>
+#include <qpid/dispatch/http1_codec.h>
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/buffer.h>
@@ -27,6 +27,17 @@
#include <ctype.h>
#include <stdio.h>
#include <string.h>
+#include <assert.h>
+
+
+//
+// This file contains code for encoding/decoding an HTTP/1.x data stream. See
+// http1_codec.h for details.
+//
+
+
+// @TODO(kgiusti)
+// - properly set 'more' flag on rx_body() callback
const uint8_t CR_TOKEN = '\r';
@@ -38,6 +49,17 @@ const qd_iterator_pointer_t NULL_I_PTR = {0};
// true for informational response codes
#define IS_INFO_RESPONSE(code) ((code) / 100 == 1)
+// true if response code indicates that the response will NOT contain a body
+// 204 = No Content
+// 205 = Reset Content
+// 304 = Not Modified
+#define NO_BODY_RESPONSE(code) \
+ ((code) == 204 || \
+ (code) == 205 || \
+ (code) == 304 || \
+ IS_INFO_RESPONSE(code))
+
+
typedef enum {
HTTP1_MSG_STATE_START = 0, // parsing start-line
HTTP1_MSG_STATE_HEADERS, // parsing headers
@@ -53,37 +75,49 @@ typedef enum {
} http1_chunk_state_t;
-typedef struct scratch_buffer_t {
+typedef struct scratch_memory_t {
uint8_t *buf;
- size_t size; // of buffer, not contents!
-} scratch_buffer_t;
+ size_t size; // of allocated memory, not contents!
+} scratch_memory_t;
-// state for a single request-response transaction
+// State for a single request-response transaction.
//
-struct http1_transfer_t {
- DEQ_LINKS(struct http1_transfer_t);
- void *context;
- http1_conn_t *conn;
- uint32_t response_code;
-
- bool close_on_done; // true if connection must be closed when transfer completes
- bool is_head_method; // true if request method is HEAD
- bool is_connect_method; // true if request method is CONNECT TODO(kgiusti): supported?
+// A new state is created when a request starts (either via the rx_request
+// callback in the case of client connections or the h1_codec_tx_request() call
+// for server connections).
+//
+// For a connection to a server the rx_response callbacks will occur in the same
+// order as h1_codec_tx_request calls are made.
+//
+// For a connection to a client the caller must ensure that calls to
+// h1_codec_tx_response() must be made in the same order as rx_request callbacks
+// occur.
+//
+struct h1_codec_request_state_t {
+ DEQ_LINKS(struct h1_codec_request_state_t);
+ void *context;
+ h1_codec_connection_t *conn;
+ char *method;
+ uint32_t response_code;
+
+ bool no_body_method; // true if request method is either HEAD or CONNECT
+ bool request_complete; // true when request message done encoding/decoding
+ bool response_complete; // true when response message done encoding/decoding
};
-DEQ_DECLARE(http1_transfer_t, http1_transfer_list_t);
-ALLOC_DECLARE(http1_transfer_t);
-ALLOC_DEFINE(http1_transfer_t);
+DEQ_DECLARE(h1_codec_request_state_t, h1_codec_request_state_list_t);
+ALLOC_DECLARE(h1_codec_request_state_t);
+ALLOC_DEFINE(h1_codec_request_state_t);
// The HTTP/1.1 connection
//
-struct http1_conn_t {
+struct h1_codec_connection_t {
void *context;
// http requests are added to tail,
// in-progress response is at head
- http1_transfer_list_t xfers;
+ h1_codec_request_state_list_t hrs_queue;
// Decoder for current incoming msg.
//
@@ -103,19 +137,18 @@ struct http1_conn_t {
qd_iterator_pointer_t read_ptr;
qd_iterator_pointer_t body_ptr;
- http1_transfer_t *xfer; // current transfer
- http1_msg_state_t state;
- scratch_buffer_t scratch;
- int error;
- const char *error_msg;
+ h1_codec_request_state_t *hrs; // current request/response
+ http1_msg_state_t state;
+ scratch_memory_t scratch;
+ const char *error_msg;
+ int error;
- uint64_t content_length;
+ intmax_t content_length;
http1_chunk_state_t chunk_state;
uint64_t chunk_length;
bool is_request;
bool is_chunked;
- bool is_1_0;
// decoded headers
bool hdr_transfer_encoding;
@@ -135,53 +168,57 @@ struct http1_conn_t {
struct encoder_t {
qd_buffer_list_t outgoing;
qd_iterator_pointer_t write_ptr;
- http1_transfer_t *xfer; // current transfer
+ h1_codec_request_state_t *hrs; // current request/response state
+ bool headers_sent; // true after all headers have been sent
bool is_request;
- bool crlf_sent; // true if the CRLF after headers has been sent
+ bool is_chunked;
+
+ // headers provided
+ bool hdr_content_length;
} encoder;
- http1_conn_config_t config;
+ h1_codec_config_t config;
};
-ALLOC_DECLARE(http1_conn_t);
-ALLOC_DEFINE(http1_conn_t);
+ALLOC_DECLARE(h1_codec_connection_t);
+ALLOC_DEFINE(h1_codec_connection_t);
static void decoder_reset(struct decoder_t *d);
static void encoder_reset(struct encoder_t *e);
-// Create a new transfer - this is done when a new http request occurs
-// Keep oldest outstanding tranfer at DEQ_HEAD(conn->xfers)
-static http1_transfer_t *http1_transfer(http1_conn_t *conn)
+// Create a new request state - this is done when a new http request occurs
+// Keep oldest outstanding tranfer at DEQ_HEAD(conn->hrs_queue)
+static h1_codec_request_state_t *h1_codec_request_state(h1_codec_connection_t *conn)
{
- http1_transfer_t *xfer = new_http1_transfer_t();
- ZERO(xfer);
- xfer->conn = conn;
- DEQ_INSERT_TAIL(conn->xfers, xfer);
- return xfer;
+ h1_codec_request_state_t *hrs = new_h1_codec_request_state_t();
+ ZERO(hrs);
+ hrs->conn = conn;
+ DEQ_INSERT_TAIL(conn->hrs_queue, hrs);
+ return hrs;
}
-static void http1_transfer_free(http1_transfer_t *xfer)
+static void h1_codec_request_state_free(h1_codec_request_state_t *hrs)
{
- if (xfer) {
- http1_conn_t *conn = xfer->conn;
- assert(conn->decoder.xfer != xfer);
- assert(conn->encoder.xfer != xfer);
- DEQ_REMOVE(conn->xfers, xfer);
- free_http1_transfer_t(xfer);
+ if (hrs) {
+ h1_codec_connection_t *conn = hrs->conn;
+ assert(conn->decoder.hrs != hrs);
+ assert(conn->encoder.hrs != hrs);
+ DEQ_REMOVE(conn->hrs_queue, hrs);
+ free_h1_codec_request_state_t(hrs);
}
}
-http1_conn_t *http1_connection(http1_conn_config_t *config, void *context)
+h1_codec_connection_t *h1_codec_connection(h1_codec_config_t *config, void *context)
{
- http1_conn_t *conn = new_http1_conn_t();
+ h1_codec_connection_t *conn = new_h1_codec_connection_t();
ZERO(conn);
conn->context = context;
conn->config = *config;
- DEQ_INIT(conn->xfers);
+ DEQ_INIT(conn->hrs_queue);
encoder_reset(&conn->encoder);
DEQ_INIT(conn->encoder.outgoing);
@@ -195,46 +232,44 @@ http1_conn_t *http1_connection(http1_conn_config_t *config, void *context)
}
-// Close the connection conn.
-//
-// This cancels all outstanding transfers and destroys the connection. If
-// there is an incoming response message body being parsed when this function
-// is invoked it will signal the end of the message.
+// The connection has closed. If this is a connection to a server this may
+// simply be the end of the response message. If so mark it complete.
//
-void http1_connection_close(http1_conn_t *conn)
+void h1_codec_connection_closed(h1_codec_connection_t *conn)
{
if (conn) {
- struct decoder_t *decoder = &conn->decoder;
- if (!decoder->error) {
- if (decoder->state == HTTP1_MSG_STATE_BODY
- && decoder->xfer) {
-
- // terminate the incoming message as the server has closed the
- // connection to indicate the end of the message
- conn->config.xfer_rx_done(decoder->xfer);
- }
-
- // notify any outstanding transfers
- http1_transfer_t *xfer = DEQ_HEAD(conn->xfers);
- while (xfer) {
- conn->config.xfer_done(xfer);
- xfer = DEQ_NEXT(xfer);
+ if (conn->config.type == HTTP1_CONN_SERVER) {
+ struct decoder_t *decoder = &conn->decoder;
+ if (decoder->hrs &&
+ decoder->hrs->request_complete &&
+ decoder->state == HTTP1_MSG_STATE_BODY) {
+
+ h1_codec_request_state_t *hrs = decoder->hrs;
+ decoder_reset(decoder);
+ hrs->response_complete = true;
+ conn->config.rx_done(hrs);
+ conn->config.request_complete(hrs, false);
+ h1_codec_request_state_free(hrs);
}
}
+ }
+}
+
+// Free the connection
+//
+void h1_codec_connection_free(h1_codec_connection_t *conn)
+{
+ if (conn) {
+ // expect application to cancel all requests!
+ assert(DEQ_IS_EMPTY(conn->hrs_queue));
decoder_reset(&conn->decoder);
encoder_reset(&conn->encoder);
qd_buffer_list_free_buffers(&conn->decoder.incoming);
qd_buffer_list_free_buffers(&conn->encoder.outgoing);
free(conn->decoder.scratch.buf);
- http1_transfer_t *xfer = DEQ_HEAD(conn->xfers);
- while (xfer) {
- http1_transfer_free(xfer); // removes from conn->xfers list
- xfer = DEQ_HEAD(conn->xfers);
- }
-
- free_http1_conn_t(conn);
+ free_h1_codec_connection_t(conn);
}
}
@@ -247,29 +282,57 @@ static void decoder_reset(struct decoder_t *decoder)
// track the current position in the incoming data stream
decoder->body_ptr = NULL_I_PTR;
- decoder->xfer = 0;
+ decoder->hrs = 0;
decoder->state = HTTP1_MSG_STATE_START;
decoder->content_length = 0;
decoder->chunk_state = HTTP1_CHUNK_HEADER;
decoder->chunk_length = 0;
decoder->error = 0;
decoder->error_msg = 0;
-
decoder->is_request = false;
decoder->is_chunked = false;
- decoder->is_1_0 = false;
-
decoder->hdr_transfer_encoding = false;
decoder->hdr_content_length = false;
}
+
// reset the tx encoder after message sent
static void encoder_reset(struct encoder_t *encoder)
{
// do not touch the write_ptr or the outgoing queue as there may be more messages to send.
- encoder->xfer = 0;
+ encoder->hrs = 0;
+ encoder->headers_sent = false;
encoder->is_request = false;
- encoder->crlf_sent = false;
+ encoder->is_chunked = false;
+ encoder->hdr_content_length = false;
+}
+
+
+// convert a string representation of a Content-Length value to
+// and integer. Return true if parse succeeds
+//
+static bool _parse_content_length(const char *clen, intmax_t *value)
+{
+ // a valid value is an integer >= 0
+ *value = 0;
+ return sscanf(clen, "%"PRIdMAX, value) == 1 && *value > -1;
+}
+
+
+// Scan the value of a Transfer-Encoding header to see if the
+// last encoding is chunked
+//
+static bool _is_transfer_chunked(const char *encoding)
+{
+ // "chunked" must be the last item in the value string. And remember kids:
+ // coding type names are case insensitive!
+ //
+ size_t len = strlen(encoding);
+ if (len >= 7) { // 7 = strlen("chunked")
+ const char *ptr = encoding + len - 7;
+ return strcasecmp("chunked", ptr) == 0;
+ }
+ return false;
}
@@ -364,8 +427,11 @@ static bool is_empty_line(const qd_iterator_pointer_t *line)
return false;
}
+
+// for debug:
static void debug_print_iterator_pointer(const char *prefix, const qd_iterator_pointer_t *ptr)
{
+#if 0
qd_iterator_pointer_t tmp = *ptr;
fprintf(stdout, "%s '", prefix);
size_t len = MIN(tmp.remaining, 80);
@@ -375,6 +441,7 @@ static void debug_print_iterator_pointer(const char *prefix, const qd_iterator_p
}
fprintf(stdout, "%s'\n", (tmp.remaining) ? " <truncated>" : "");
fflush(stdout);
+#endif
}
@@ -414,7 +481,7 @@ static bool read_line(qd_iterator_pointer_t *data, qd_iterator_pointer_t *line)
}
-static bool ensure_scratch_size(scratch_buffer_t *b, size_t required)
+static bool ensure_scratch_size(scratch_memory_t *b, size_t required)
{
if (b->size < required) {
if (b->buf)
@@ -514,7 +581,7 @@ static bool parse_field(qd_iterator_pointer_t *line, qd_iterator_pointer_t *fiel
// parse the HTTP/1.1 request line:
// "method SP request-target SP HTTP-version CRLF"
//
-static bool parse_request_line(http1_conn_t *conn, struct decoder_t *decoder, qd_iterator_pointer_t *line)
+static bool parse_request_line(h1_codec_connection_t *conn, struct decoder_t *decoder, qd_iterator_pointer_t *line)
{
qd_iterator_pointer_t method = {0};
qd_iterator_pointer_t target = {0};
@@ -561,21 +628,20 @@ static bool parse_request_line(http1_conn_t *conn, struct decoder_t *decoder, qd
return decoder->error;
}
- http1_transfer_t *xfer = http1_transfer(conn);
+ h1_codec_request_state_t *hrs = h1_codec_request_state(conn);
// check for methods that do not support body content in the response:
- if (strcmp((char*)method_str, "HEAD") == 0)
- xfer->is_head_method = true;
- else if (strcmp((char*)method_str, "CONNECT") == 0)
- xfer->is_connect_method = true;
+ hrs->no_body_method = (strcmp((char*)method_str, "HEAD") == 0 ||
+ strcmp((char*)method_str, "CONNECT") == 0);
+
+ hrs->method = qd_strdup((char*) method_str);
- decoder->xfer = xfer;
+ decoder->hrs = hrs;
decoder->is_request = true;
- decoder->is_1_0 = (minor == 0);
- decoder->error = conn->config.xfer_rx_request(xfer, (char*)method_str, (char*)target_str, (char*)version_str);
+ decoder->error = conn->config.rx_request(hrs, (char*)method_str, (char*)target_str, major, minor);
if (decoder->error)
- decoder->error_msg = "xfer_rx_request callback error";
+ decoder->error_msg = "hrs_rx_request callback error";
return decoder->error;
}
@@ -583,7 +649,7 @@ static bool parse_request_line(http1_conn_t *conn, struct decoder_t *decoder, qd
// parse the HTTP/1.1 response line
// "HTTP-version SP status-code [SP reason-phrase] CRLF"
//
-static int parse_response_line(http1_conn_t *conn, struct decoder_t *decoder, qd_iterator_pointer_t *line)
+static int parse_response_line(h1_codec_connection_t *conn, struct decoder_t *decoder, qd_iterator_pointer_t *line)
{
qd_iterator_pointer_t version = {0};
qd_iterator_pointer_t status_code = {0};
@@ -599,23 +665,23 @@ static int parse_response_line(http1_conn_t *conn, struct decoder_t *decoder, qd
}
// Responses arrive in the same order as requests are generated so this new
- // response corresponds to head xfer
- http1_transfer_t *xfer = DEQ_HEAD(conn->xfers);
- if (!xfer) {
+ // response corresponds to head hrs
+ h1_codec_request_state_t *hrs = DEQ_HEAD(conn->hrs_queue);
+ if (!hrs) {
// receiving a response without a corresponding request
decoder->error_msg = "Spurious HTTP response received";
decoder->error = HTTP1_STATUS_SERVER_ERR;
return decoder->error;
}
- assert(!decoder->xfer); // state machine violation
- assert(xfer->response_code == 0);
+ assert(!decoder->hrs); // state machine violation
+ assert(hrs->response_code == 0);
- decoder->xfer = xfer;
+ decoder->hrs = hrs;
unsigned char code_str[4];
pointer_2_str(&status_code, code_str, 4);
- xfer->response_code = atoi((char*) code_str);
+ hrs->response_code = atoi((char*) code_str);
// the reason phrase is optional, and may contain spaces
@@ -652,13 +718,13 @@ static int parse_response_line(http1_conn_t *conn, struct decoder_t *decoder, qd
}
decoder->is_request = false;
- decoder->is_1_0 = (minor == 0);
- decoder->error = conn->config.xfer_rx_response(decoder->xfer, (char*)version_str,
- xfer->response_code,
- (offset) ? (char*)reason_str: 0);
+ decoder->error = conn->config.rx_response(decoder->hrs,
+ hrs->response_code,
+ (offset) ? (char*)reason_str: 0,
+ major, minor);
if (decoder->error)
- decoder->error_msg = "xfer_rx_response callback error";
+ decoder->error_msg = "hrs_rx_response callback error";
return decoder->error;
}
@@ -666,7 +732,7 @@ static int parse_response_line(http1_conn_t *conn, struct decoder_t *decoder, qd
// parse the first line of an incoming http message
//
-static bool parse_start_line(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_start_line(h1_codec_connection_t *conn, struct decoder_t *decoder)
{
qd_iterator_pointer_t *rptr = &decoder->read_ptr;
qd_iterator_pointer_t line;
@@ -695,7 +761,7 @@ static bool parse_start_line(http1_conn_t *conn, struct decoder_t *decoder)
// Called after the last incoming header was decoded and passed to the
// application
//
-static bool process_headers_done(http1_conn_t *conn, struct decoder_t *decoder)
+static bool process_headers_done(h1_codec_connection_t *conn, struct decoder_t *decoder)
{
// Flush all buffers processed so far - no longer needed
@@ -721,12 +787,6 @@ static bool process_headers_done(http1_conn_t *conn, struct decoder_t *decoder)
}
}
- decoder->error = conn->config.xfer_rx_headers_done(decoder->xfer);
- if (decoder->error) {
- decoder->error_msg = "xfer_rx_headers_done callback error";
- return false;
- }
-
// determine if a body is present (ref RFC7230 sec 3.3.3 Message Body Length)
bool has_body;
if (decoder->is_request) {
@@ -740,13 +800,8 @@ static bool process_headers_done(http1_conn_t *conn, struct decoder_t *decoder)
// size via Content-Length or chunked encoder, OR its length is unspecified
// and the message body is terminated by closing the connection.
//
- http1_transfer_t *xfer = decoder->xfer;
- has_body = !(xfer->is_head_method ||
- xfer->is_connect_method ||
- xfer->response_code == 204 || // No Content
- xfer->response_code == 205 || // Reset Content
- xfer->response_code == 304 || // Not Modified
- IS_INFO_RESPONSE(xfer->response_code));
+ h1_codec_request_state_t *hrs = decoder->hrs;
+ has_body = !(hrs->no_body_method || NO_BODY_RESPONSE(hrs->response_code));
if (has_body) {
// no body if explicit Content-Length of zero
if (decoder->hdr_content_length && decoder->content_length == 0) {
@@ -755,6 +810,12 @@ static bool process_headers_done(http1_conn_t *conn, struct decoder_t *decoder)
}
}
+ decoder->error = conn->config.rx_headers_done(decoder->hrs, has_body);
+ if (decoder->error) {
+ decoder->error_msg = "hrs_rx_headers_done callback error";
+ return false;
+ }
+
if (has_body) {
// start tracking the body buffer chain
decoder->body_ptr = decoder->read_ptr;
@@ -770,13 +831,13 @@ static bool process_headers_done(http1_conn_t *conn, struct decoder_t *decoder)
// process a received header to determine message body length, etc.
//
-static int process_header(http1_conn_t *conn, struct decoder_t *decoder, const uint8_t *key, const uint8_t *value)
+static int process_header(h1_codec_connection_t *conn, struct decoder_t *decoder, const uint8_t *key, const uint8_t *value)
{
int parse_error = decoder->is_request ? HTTP1_STATUS_BAD_REQ : HTTP1_STATUS_SERVER_ERR;
if (strcasecmp("Content-Length", (char*) key) == 0) {
- uint64_t old = decoder->content_length;
- if (sscanf((char*)value, "%"PRIu64, &decoder->content_length) != 1) {
+ intmax_t old = decoder->content_length;
+ if (!_parse_content_length((char*) value, &decoder->content_length)) {
decoder->error_msg = "Malformed Content-Length header";
decoder->error = parse_error;
return decoder->error;
@@ -788,16 +849,8 @@ static int process_header(http1_conn_t *conn, struct decoder_t *decoder, const u
}
decoder->hdr_content_length = true;
- } else if (strcasecmp("Transfer-Encoding", (char *)key) == 0) {
- // check if "chunked" is present and it is the last item in the value
- // string. And remember kids: coding type names are case insensitive!
- // Also note "value" has already been trimmed of whitespace at both
- // ends.
- size_t len = strlen((char*)value);
- if (len >= 7) { // 7 = strlen("chunked")
- const char *ptr = ((char*) value) + len - 7;
- decoder->is_chunked = strcasecmp("chunked", ptr) == 0;
- }
+ } else if (strcasecmp("Transfer-Encoding", (char*) key) == 0) {
+ decoder->is_chunked = _is_transfer_chunked((char*) value);
decoder->hdr_transfer_encoding = true;
}
@@ -807,12 +860,12 @@ static int process_header(http1_conn_t *conn, struct decoder_t *decoder, const u
// Parse out the header key and value
//
-static bool parse_header(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_header(h1_codec_connection_t *conn, struct decoder_t *decoder)
{
qd_iterator_pointer_t *rptr = &decoder->read_ptr;
qd_iterator_pointer_t line;
- http1_transfer_t *xfer = decoder->xfer;
- assert(xfer); // else state machine busted
+ h1_codec_request_state_t *hrs = decoder->hrs;
+ assert(hrs); // else state machine busted
if (read_line(rptr, &line)) {
debug_print_iterator_pointer("header:", &line);
@@ -862,9 +915,9 @@ static bool parse_header(http1_conn_t *conn, struct decoder_t *decoder)
process_header(conn, decoder, key_str, value_str);
if (!decoder->error) {
- decoder->error = conn->config.xfer_rx_header(xfer, (char *)key_str, (char *)value_str);
+ decoder->error = conn->config.rx_header(hrs, (char *)key_str, (char *)value_str);
if (decoder->error)
- decoder->error_msg = "xfer_rx_header callback error";
+ decoder->error_msg = "hrs_rx_header callback error";
}
return !!rptr->remaining;
@@ -880,7 +933,7 @@ static bool parse_header(http1_conn_t *conn, struct decoder_t *decoder)
// Pass message body data up to the application.
//
-static inline int consume_body_data(http1_conn_t *conn, bool flush)
+static inline int consume_body_data(h1_codec_connection_t *conn, bool flush)
{
struct decoder_t *decoder = &conn->decoder;
qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
@@ -889,9 +942,10 @@ static inline int consume_body_data(http1_conn_t *conn, bool flush)
// shortcut: if no more data to parse send the entire incoming chain
if (rptr->remaining == 0) {
- decoder->error = conn->config.xfer_rx_body(decoder->xfer, &decoder->incoming,
- body_ptr->cursor - qd_buffer_base(body_ptr->buffer),
- body_ptr->remaining);
+ decoder->error = conn->config.rx_body(decoder->hrs, &decoder->incoming,
+ body_ptr->cursor - qd_buffer_base(body_ptr->buffer),
+ body_ptr->remaining,
+ true);
DEQ_INIT(decoder->incoming);
*body_ptr = NULL_I_PTR;
*rptr = NULL_I_PTR;
@@ -902,7 +956,7 @@ static inline int consume_body_data(http1_conn_t *conn, bool flush)
// unparsed data. Send any buffers preceding the current read pointer.
qd_buffer_list_t blist = DEQ_EMPTY;
size_t octets = 0;
- const size_t body_offset = body_ptr->cursor - qd_buffer_base(body_ptr->buffer);
+ size_t body_offset = body_ptr->cursor - qd_buffer_base(body_ptr->buffer);
// invariant:
assert(DEQ_HEAD(decoder->incoming) == body_ptr->buffer);
@@ -928,12 +982,15 @@ static inline int consume_body_data(http1_conn_t *conn, bool flush)
qd_buffer_insert(tail, body_ptr->remaining);
DEQ_INSERT_TAIL(blist, tail);
octets += body_ptr->remaining;
+ if (DEQ_SIZE(blist) == 1)
+ body_offset = 0;
*body_ptr = *rptr;
body_ptr->remaining = 0;
}
- decoder->error = conn->config.xfer_rx_body(decoder->xfer, &blist, body_offset, octets);
+ if (octets)
+ decoder->error = conn->config.rx_body(decoder->hrs, &blist, body_offset, octets, true);
return decoder->error;
}
@@ -942,7 +999,7 @@ static inline int consume_body_data(http1_conn_t *conn, bool flush)
// parsing the start of a chunked header:
// <chunk size in hex>CRLF
//
-static bool parse_body_chunked_header(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_body_chunked_header(h1_codec_connection_t *conn, struct decoder_t *decoder)
{
qd_iterator_pointer_t *rptr = &decoder->read_ptr;
qd_iterator_pointer_t line;
@@ -984,7 +1041,7 @@ static bool parse_body_chunked_header(http1_conn_t *conn, struct decoder_t *deco
// Parse the data section of a chunk
//
-static bool parse_body_chunked_data(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_body_chunked_data(h1_codec_connection_t *conn, struct decoder_t *decoder)
{
qd_iterator_pointer_t *rptr = &decoder->read_ptr;
qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
@@ -1007,7 +1064,7 @@ static bool parse_body_chunked_data(http1_conn_t *conn, struct decoder_t *decode
// Keep reading chunk trailers until the terminating empty line is read
//
-static bool parse_body_chunked_trailer(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_body_chunked_trailer(h1_codec_connection_t *conn, struct decoder_t *decoder)
{
qd_iterator_pointer_t *rptr = &decoder->read_ptr;
qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
@@ -1032,7 +1089,7 @@ static bool parse_body_chunked_trailer(http1_conn_t *conn, struct decoder_t *dec
// parse an incoming message body which is chunk encoded
// Return True if there is more data pending to parse
-static bool parse_body_chunked(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_body_chunked(h1_codec_connection_t *conn, struct decoder_t *decoder)
{
bool more = true;
switch (decoder->chunk_state) {
@@ -1056,7 +1113,7 @@ static bool parse_body_chunked(http1_conn_t *conn, struct decoder_t *decoder)
// parse an incoming message body which is Content-Length bytes long
//
-static bool parse_body_content(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_body_content(h1_codec_connection_t *conn, struct decoder_t *decoder)
{
qd_iterator_pointer_t *rptr = &decoder->read_ptr;
qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
@@ -1074,7 +1131,7 @@ static bool parse_body_content(http1_conn_t *conn, struct decoder_t *decoder)
}
-static bool parse_body(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_body(h1_codec_connection_t *conn, struct decoder_t *decoder)
{
if (decoder->is_chunked)
return parse_body_chunked(conn, decoder);
@@ -1084,13 +1141,14 @@ static bool parse_body(http1_conn_t *conn, struct decoder_t *decoder)
// otherwise no explict body size, so just keep passing the entire unparsed
// incoming chain along until the remote closes the connection
- decoder->error = conn->config.xfer_rx_body(decoder->xfer,
- &decoder->incoming,
- decoder->read_ptr.cursor
- - qd_buffer_base(decoder->read_ptr.buffer),
- decoder->read_ptr.remaining);
+ decoder->error = conn->config.rx_body(decoder->hrs,
+ &decoder->incoming,
+ decoder->read_ptr.cursor
+ - qd_buffer_base(decoder->read_ptr.buffer),
+ decoder->read_ptr.remaining,
+ true);
if (decoder->error) {
- decoder->error_msg = "xfer_rx_body callback error";
+ decoder->error_msg = "hrs_rx_body callback error";
return false;
}
@@ -1103,39 +1161,40 @@ static bool parse_body(http1_conn_t *conn, struct decoder_t *decoder)
// Called when incoming message is complete
//
-static bool parse_done(http1_conn_t *conn, struct decoder_t *decoder)
+static bool parse_done(h1_codec_connection_t *conn, struct decoder_t *decoder)
{
- http1_transfer_t *xfer = decoder->xfer;
+ h1_codec_request_state_t *hrs = decoder->hrs;
bool is_response = !decoder->is_request;
- if (!decoder->error) {
- // signal the message receive is complete
- conn->config.xfer_rx_done(xfer);
+ // signal the message receive is complete
+ conn->config.rx_done(hrs);
- if (is_response) { // request<->response transfer complete
-
- // Informational 1xx response codes are NOT teriminal - further responses are allowed!
- if (IS_INFO_RESPONSE(xfer->response_code)) {
- xfer->response_code = 0;
- } else {
- // The message exchange is complete
- conn->config.xfer_done(xfer);
- decoder->xfer = 0;
- http1_transfer_free(xfer);
- }
+ if (is_response) {
+ // Informational 1xx response codes are NOT teriminal - further responses are allowed!
+ if (IS_INFO_RESPONSE(hrs->response_code)) {
+ hrs->response_code = 0;
+ } else {
+ hrs->response_complete = true;
}
+ } else {
+ hrs->request_complete = true;
+ }
- decoder_reset(decoder);
- return !!decoder->read_ptr.remaining;
+ if (hrs->request_complete && hrs->response_complete) {
+ conn->config.request_complete(hrs, false);
+ decoder->hrs = 0;
+ h1_codec_request_state_free(hrs);
}
- return false;
+
+ decoder_reset(decoder);
+ return !!decoder->read_ptr.remaining;
}
// Main decode loop.
// Process received data until it is exhausted
//
-static int decode_incoming(http1_conn_t *conn)
+static int decode_incoming(h1_codec_connection_t *conn)
{
struct decoder_t *decoder = &conn->decoder;
bool more = true;
@@ -1159,20 +1218,18 @@ static int decode_incoming(http1_conn_t *conn)
}
-void *http1_connection_get_context(http1_conn_t *conn)
+void *h1_codec_connection_get_context(h1_codec_connection_t *conn)
{
return conn->context;
}
// Push inbound network data into the http1 protocol engine.
//
-// All of the xfer_rx callback will occur in the context of this call. This
+// All of the hrs_rx callback will occur in the context of this call. This
// returns zero on success otherwise an error code. Any error occuring during
-// a callback will be reflected in the return value of this function. It is
-// expected that the caller will call http1_connection_close on a non-zero
-// return value.
+// a callback will be reflected in the return value of this function.
//
-int http1_connection_rx_data(http1_conn_t *conn, qd_buffer_list_t *data, size_t len)
+int h1_codec_connection_rx_data(h1_codec_connection_t *conn, qd_buffer_list_t *data, size_t len)
{
struct decoder_t *decoder = &conn->decoder;
bool init_ptrs = DEQ_IS_EMPTY(decoder->incoming);
@@ -1195,79 +1252,117 @@ int http1_connection_rx_data(http1_conn_t *conn, qd_buffer_list_t *data, size_t
return decode_incoming(conn);
}
-void http1_transfer_set_context(http1_transfer_t *xfer, void *context)
+
+void h1_codec_request_state_set_context(h1_codec_request_state_t *hrs, void *context)
{
- xfer->context = context;
+ hrs->context = context;
}
-void *http1_transfer_get_context(const http1_transfer_t *xfer)
+
+void *h1_codec_request_state_get_context(const h1_codec_request_state_t *hrs)
+{
+ return hrs->context;
+}
+
+
+h1_codec_connection_t *h1_codec_request_state_get_connection(const h1_codec_request_state_t *hrs)
+{
+ return hrs->conn;
+}
+
+
+const char *h1_codec_request_state_method(const h1_codec_request_state_t *hrs)
{
- return xfer->context;
+ return hrs->method;
}
-http1_conn_t *http1_transfer_get_connection(const http1_transfer_t *xfer)
+
+void h1_codec_request_state_cancel(h1_codec_request_state_t *hrs)
{
- return xfer->conn;
+ if (hrs) {
+ h1_codec_connection_t *conn = hrs->conn;
+ if (hrs == conn->decoder.hrs) {
+ decoder_reset(&conn->decoder);
+ }
+ if (hrs == conn->encoder.hrs) {
+ encoder_reset(&conn->encoder);
+ }
+ conn->config.request_complete(hrs, true);
+ h1_codec_request_state_free(hrs);
+ }
}
-// initiate a new HTTP request. This creates a new transfer.
+// initiate a new HTTP request. This creates a new request state.
// request = <method>SP<target>SP<version>CRLF
-// Expects version to be in the format "HTTP/X.Y"
//
-http1_transfer_t *http1_tx_request(http1_conn_t *conn, const char *method, const char *target, const char *version)
+h1_codec_request_state_t *h1_codec_tx_request(h1_codec_connection_t *conn, const char *method, const char *target,
+ uint32_t version_major, uint32_t version_minor)
{
struct encoder_t *encoder = &conn->encoder;
- assert(!encoder->xfer); // error: transfer already in progress
+ assert(!encoder->hrs); // error: transfer already in progress
assert(conn->config.type == HTTP1_CONN_SERVER);
- http1_transfer_t *xfer = encoder->xfer = http1_transfer(conn);
+ h1_codec_request_state_t *hrs = encoder->hrs = h1_codec_request_state(conn);
encoder->is_request = true;
- encoder->crlf_sent = false;
+ encoder->headers_sent = false;
- if (strcmp((char*)method, "HEAD") == 0)
- xfer->is_head_method = true;
- else if (strcmp((char*)method, "CONNECT") == 0)
- xfer->is_connect_method = true;
+ hrs->method = qd_strdup(method);
+
+ // check for methods that do not support body content in the response:
+ hrs->no_body_method = (strcmp((char*)method, "HEAD") == 0 ||
+ strcmp((char*)method, "CONNECT") == 0);
write_string(encoder, method);
write_string(encoder, " ");
write_string(encoder, target);
write_string(encoder, " ");
- write_string(encoder, version);
+ {
+ char version[64];
+ snprintf(version, 64, "HTTP/%"PRIu32".%"PRIu32, version_major, version_minor);
+ write_string(encoder, version);
+ }
write_string(encoder, CRLF);
- return xfer;
+ return hrs;
}
-// Send an HTTP response msg. xfer must correspond to the "oldest" outstanding
-// request that arrived via the xfer_rx_request callback for this connection.
-// version is expected to be in the form "HTTP/x.y"
+// Send an HTTP response msg. hrs must correspond to the "oldest" outstanding
+// request that arrived via the hrs_rx_request callback for this connection.
// status_code is expected to be 100 <= status_code <= 999
// status-line = HTTP-version SP status-code SP reason-phrase CRLF
//
-int http1_tx_response(http1_transfer_t *xfer, const char *version, int status_code, const char *reason_phrase)
+int h1_codec_tx_response(h1_codec_request_state_t *hrs, int status_code, const char *reason_phrase,
+ uint32_t version_major, uint32_t version_minor)
{
- http1_conn_t *conn = http1_transfer_get_connection(xfer);
+ h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs);
struct encoder_t *encoder = &conn->encoder;
assert(conn->config.type == HTTP1_CONN_CLIENT);
- assert(!encoder->xfer); // error: transfer already in progress
- assert(DEQ_HEAD(conn->xfers) == xfer); // error: response not in order!
- assert(xfer->response_code == 0);
+ assert(!encoder->hrs); // error: transfer already in progress
+ assert(DEQ_HEAD(conn->hrs_queue) == hrs); // error: response not in order!
+ assert(hrs->response_code == 0);
- encoder->xfer = xfer;
+ encoder->hrs = hrs;
encoder->is_request = false;
- encoder->crlf_sent = false;
- xfer->response_code = status_code;
+ encoder->headers_sent = false;
+ hrs->response_code = status_code;
- char code_str[32];
- snprintf(code_str, 32, "%d", status_code);
+ {
+ char version[64];
+ snprintf(version, 64, "HTTP/%"PRIu32".%"PRIu32, version_major, version_minor);
+ write_string(encoder, version);
+ }
- write_string(encoder, version);
write_string(encoder, " ");
- write_string(encoder, code_str);
+
+ {
+ char code_str[32];
+ snprintf(code_str, 32, "%d", status_code);
+ write_string(encoder, code_str);
+ }
+
if (reason_phrase) {
write_string(encoder, " ");
write_string(encoder, reason_phrase);
@@ -1280,17 +1375,26 @@ int http1_tx_response(http1_transfer_t *xfer, const char *version, int status_co
// Add a header field to an outgoing message
// header-field = field-name ":" OWS field-value OWS
-int http1_tx_add_header(http1_transfer_t *xfer, const char *key, const char *value)
+int h1_codec_tx_add_header(h1_codec_request_state_t *hrs, const char *key, const char *value)
{
- http1_conn_t *conn = http1_transfer_get_connection(xfer);
+ h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs);
struct encoder_t *encoder = &conn->encoder;
- assert(encoder->xfer == xfer); // xfer not current transfer
+ assert(encoder->hrs == hrs); // hrs not current transfer
write_string(encoder, key);
write_string(encoder, ": ");
write_string(encoder, value);
write_string(encoder, CRLF);
+ // determine if the body length is provided. If not
+ // the caller will have to close the connection
+ //
+ if (strcasecmp("Content-Length", (char*) key) == 0) {
+ encoder->hdr_content_length = true;
+ } else if (strcasecmp("Transfer-Encoding", (char *)key) == 0) {
+ encoder->is_chunked = _is_transfer_chunked(value);
+ }
+
// check to see if there are any full buffers that can be sent.
qd_buffer_list_t blist = DEQ_EMPTY;
@@ -1300,78 +1404,100 @@ int http1_tx_add_header(http1_transfer_t *xfer, const char *key, const char *val
DEQ_REMOVE_HEAD(encoder->outgoing);
DEQ_INSERT_TAIL(blist, buf);
octets += qd_buffer_size(buf);
+ buf = DEQ_HEAD(encoder->outgoing);
}
if (!DEQ_IS_EMPTY(blist))
- conn->config.conn_tx_data(conn, &blist, 0, octets);
+ conn->config.tx_buffers(hrs, &blist, octets);
return 0;
}
-// just forward the body chain along
-int http1_tx_body(http1_transfer_t *xfer, qd_buffer_list_t *data, size_t offset, size_t len)
+static inline void _flush_headers(h1_codec_request_state_t *hrs, struct encoder_t *encoder)
{
- http1_conn_t *conn = http1_transfer_get_connection(xfer);
- struct encoder_t *encoder = &conn->encoder;
-
- fprintf(stderr, "http1_tx_body(offset=%zu size=%zu)\n", offset, len);
-
- if (!encoder->crlf_sent) {
+ if (!encoder->headers_sent) {
// need to terminate any headers by sending the plain CRLF that follows
// the headers
write_string(encoder, CRLF);
// flush all pending output. From this point out the outgoing queue is
// no longer used for this message
- fprintf(stderr, "Flushing before body: %u bytes\n", qd_buffer_list_length(&encoder->outgoing));
- conn->config.conn_tx_data(conn, &encoder->outgoing, 0, qd_buffer_list_length(&encoder->outgoing));
+ hrs->conn->config.tx_buffers(hrs, &encoder->outgoing, qd_buffer_list_length(&encoder->outgoing));
DEQ_INIT(encoder->outgoing);
encoder->write_ptr = NULL_I_PTR;
- encoder->crlf_sent = true;
+ encoder->headers_sent = true;
}
+}
+
+
+// just forward the body chain along
+int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data)
+{
+ h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs);
+ struct encoder_t *encoder = &conn->encoder;
+
+ if (!encoder->headers_sent)
+ _flush_headers(hrs, encoder);
// skip the outgoing queue and send directly
- fprintf(stderr, "Sending body data %zu bytes\n", len);
- conn->config.conn_tx_data(conn, data, offset, len);
+ conn->config.tx_body_data(hrs, body_data);
return 0;
}
-int http1_tx_done(http1_transfer_t *xfer)
+int h1_codec_tx_done(h1_codec_request_state_t *hrs, bool *need_close)
{
- http1_conn_t *conn = http1_transfer_get_connection(xfer);
+ h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs);
struct encoder_t *encoder = &conn->encoder;
+ if (need_close)
+ *need_close = false;
- if (!encoder->crlf_sent) {
- // need to send the plain CRLF that follows the headers
- write_string(encoder, CRLF);
-
- // flush all pending output.
-
- fprintf(stderr, "Flushing at tx_done: %u bytes\n", qd_buffer_list_length(&encoder->outgoing));
- conn->config.conn_tx_data(conn, &encoder->outgoing, 0, qd_buffer_list_length(&encoder->outgoing));
- DEQ_INIT(encoder->outgoing);
- encoder->write_ptr = NULL_I_PTR;
- encoder->crlf_sent = true;
- }
+ if (!encoder->headers_sent)
+ _flush_headers(hrs, encoder);
bool is_response = !encoder->is_request;
- encoder_reset(encoder);
if (is_response) {
- if (IS_INFO_RESPONSE(xfer->response_code)) {
+ if (IS_INFO_RESPONSE(hrs->response_code)) {
// this is a non-terminal response. Another response is expected
// for this request so just reset the transfer state
- xfer->response_code = 0;
+ hrs->response_code = 0;
} else {
- // The message exchange is complete
- conn->config.xfer_done(xfer);
- http1_transfer_free(xfer);
+ hrs->response_complete = true;
+ if (need_close) {
+ // if the message body size is not explicit the connection has
+ // to be closed to indicate end of message
+ if (!hrs->no_body_method &&
+ !NO_BODY_RESPONSE(hrs->response_code) &&
+ !encoder->is_chunked &&
+ !encoder->hdr_content_length) {
+
+ *need_close = true;
+ }
+ }
}
+ } else {
+ hrs->request_complete = true;
+ }
+
+ encoder_reset(encoder);
+ if (hrs->request_complete && hrs->response_complete) {
+ conn->config.request_complete(hrs, false);
+ h1_codec_request_state_free(hrs);
}
return 0;
}
+bool h1_codec_request_complete(const h1_codec_request_state_t *hrs)
+{
+ return hrs && hrs->request_complete;
+}
+
+
+bool h1_codec_response_complete(const h1_codec_request_state_t *hrs)
+{
+ return hrs && hrs->response_complete;
+}
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
new file mode 100644
index 0000000..5533aee
--- /dev/null
+++ b/src/adaptors/http1/http1_private.h
@@ -0,0 +1,272 @@
+#ifndef http1_private_H
+#define http1_private_H 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+//
+// HTTP/1.x Adaptor Internals
+//
+// Nomenclature:
+// "in": for information flowing from the endpoint into the router
+// (from proactor to core)
+// "out": for information flowing from the router out to the endpoint
+// (from core to proactor)
+//
+#include <qpid/dispatch/http1_codec.h>
+#include <qpid/dispatch/protocol_adaptor.h>
+#include <qpid/dispatch/message.h>
+#include "adaptors/http_common.h"
+
+typedef struct qdr_http1_out_data_t qdr_http1_out_data_t;
+typedef struct qdr_http1_out_data_fifo_t qdr_http1_out_data_fifo_t;
+typedef struct qdr_http1_request_base_t qdr_http1_request_base_t;
+typedef struct qdr_http1_connection_t qdr_http1_connection_t;
+
+DEQ_DECLARE(qdr_http1_connection_t, qdr_http1_connection_list_t);
+
+
+typedef struct qdr_http1_adaptor_t {
+ qdr_core_t *core;
+ qdr_protocol_adaptor_t *adaptor;
+ qd_log_source_t *log;
+ sys_mutex_t *lock; // for the lists
+ qd_http_lsnr_list_t listeners;
+ qd_http_connector_list_t connectors;
+ qdr_http1_connection_list_t connections;
+} qdr_http1_adaptor_t;
+
+extern qdr_http1_adaptor_t *qdr_http1_adaptor;
+
+
+// Data to be written out the raw connection.
+//
+// This adaptor has to cope with two different data sources: the HTTP1 encoder
+// and the qd_message_body_data_t list. The HTTP1 encoder produces a simple
+// qd_buffer_list_t for outgoing header data whose ownership is given to the
+// adaptor: the adaptor is free to deque/free these buffers as needed. The
+// qd_message_body_data_t buffers are shared with the owning message and the
+// buffer list must not be modified by the adaptor. The qdr_http1_out_data_t
+// is used to manage both types of data sources.
+//
+struct qdr_http1_out_data_t {
+ DEQ_LINKS(qdr_http1_out_data_t);
+
+ qdr_http1_out_data_fifo_t *owning_fifo;
+
+ // data is either in a raw buffer chain
+ // or a message body data (not both!)
+
+ qd_buffer_list_t raw_buffers;
+ qd_message_body_data_t *body_data;
+
+ int buffer_count; // # total buffers
+ int next_buffer; // offset to next buffer to send
+ int free_count; // # buffers returned from proton
+};
+ALLOC_DECLARE(qdr_http1_out_data_t);
+DEQ_DECLARE(qdr_http1_out_data_t, qdr_http1_out_data_list_t);
+
+
+//
+// A fifo of outgoing (raw connection) data, oldest at HEAD.
+//
+// write_ptr tracks the point in the fifo where the current out_data node that
+// is being written to the raw connection. As the raw connection returns
+// written buffers (PN_RAW_CONNECTION_WRITTEN) the are removed from the HEAD
+// and freed.
+//
+struct qdr_http1_out_data_fifo_t {
+ qdr_http1_out_data_list_t fifo;
+ qdr_http1_out_data_t *write_ptr;
+};
+
+
+// Per HTTP request/response(s) state.
+//
+// This base class is extended for client and server-specific state, see
+// http1_client.c and http1_server.c A reference is stored in
+// qdr_delivery_get_context(dlv)
+//
+struct qdr_http1_request_base_t {
+ DEQ_LINKS(qdr_http1_request_base_t);
+
+ uint64_t msg_id;
+ h1_codec_request_state_t *lib_rs;
+ qdr_http1_connection_t *hconn; // parent connection
+ char *response_addr; // request reply-to
+
+ // statistics
+ //
+ uint64_t in_http1_octets; // read from raw conn
+ uint64_t out_http1_octets; // written to raw conn
+};
+DEQ_DECLARE(qdr_http1_request_base_t, qdr_http1_request_list_t);
+
+
+// A single HTTP adaptor connection.
+//
+struct qdr_http1_connection_t {
+ DEQ_LINKS(qdr_http1_connection_t);
+ qd_server_t *qd_server;
+ h1_codec_connection_t *http_conn;
+ pn_raw_connection_t *raw_conn;
+ qdr_connection_t *qdr_conn;
+ qdr_http1_adaptor_t *adaptor;
+
+ uint64_t conn_id;
+ qd_handler_context_t handler_context;
+ h1_codec_connection_type_t type;
+
+ struct {
+ char *host;
+ char *port;
+ char *address;
+ char *host_port;
+ } cfg;
+
+ // State if connected to an HTTP client
+ //
+ struct {
+ char *client_ip_addr;
+ char *reply_to_addr; // set once link is up
+ uint64_t next_msg_id;
+ } client;
+
+ // State if connected to an HTTP server
+ struct {
+ qd_timer_t *reconnect_timer;
+ int reconnect_count;
+ } server;
+
+ // Outgoing link (router ==> HTTP app)
+ //
+ qdr_link_t *out_link;
+ uint64_t out_link_id;
+ int out_link_credit; // provided by adaptor
+
+ // Incoming link (HTTP app ==> router)
+ //
+ qdr_link_t *in_link;
+ uint64_t in_link_id;
+ int in_link_credit; // provided by router
+
+ // Oldest at HEAD
+ //
+ qdr_http1_request_list_t requests;
+
+ // statistics
+ //
+ uint64_t in_http1_octets;
+ uint64_t out_http1_octets;
+
+ // flags
+ //
+ bool trace;
+ bool close_connection;
+};
+ALLOC_DECLARE(qdr_http1_connection_t);
+
+// special AMQP application properties keys for HTTP1 metadata headers
+//
+#define HTTP1_HEADER_PREFIX "http:" // reserved prefix
+#define REQUEST_HEADER_KEY "http:request" // request msg, value=version
+#define RESPONSE_HEADER_KEY "http:response" // response msg, value=version
+#define REASON_HEADER_KEY "http:reason" // from response (optional)
+#define TARGET_HEADER_KEY "http:target" // request target
+#define STATUS_HEADER_KEY "http:status" // response status (integer)
+
+
+// http1_adaptor.c
+//
+//int qdr_http1_write_out_data(qdr_http1_connection_t *hconn);
+//void qdr_http1_write_buffer_list(qdr_http1_request_t *hreq, qd_buffer_list_t *blist);
+
+void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn);
+void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_list_t *blist);
+void qdr_http1_enqueue_body_data(qdr_http1_out_data_fifo_t *fifo, qd_message_body_data_t *body_data);
+uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_fifo_t *fifo);
+void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data);
+// return the number of buffers currently held by the proactor for writing
+int qdr_http1_out_data_buffers_outstanding(const qdr_http1_out_data_fifo_t *out_data);
+
+void qdr_http1_close_connection(qdr_http1_connection_t *hconn, const char *error);
+void qdr_http1_connection_free(qdr_http1_connection_t *hconn);
+
+void qdr_http1_request_base_cleanup(qdr_http1_request_base_t *hreq);
+void qdr_http1_error_response(qdr_http1_request_base_t *hreq,
+ int error_code,
+ const char *reason);
+void qdr_http1_rejected_response(qdr_http1_request_base_t *hreq,
+ const qdr_error_t *error);
+
+
+// return the next HTTP token in a comma separated list of tokens
+//
+// start - search for token start pointer
+// len - length of token if non-null returned
+// next - address of start of next token
+//
+const char *qdr_http1_token_list_next(const char *start, size_t *len, const char **next);
+
+
+
+// http1_client.c protocol adaptor callbacks
+//
+void qdr_http1_client_core_link_flow(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ qdr_link_t *link,
+ int credit);
+uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ qdr_link_t *link,
+ qdr_delivery_t *delivery,
+ bool settled);
+void qdr_http1_client_core_second_attach(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ qdr_link_t *link,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target);
+void qdr_http1_client_core_delivery_update(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ qdr_http1_request_base_t *hreqb,
+ qdr_delivery_t *dlv,
+ uint64_t disp,
+ bool settled);
+
+
+// http1_server.c protocol adaptor callbacks
+//
+void qdr_http1_server_core_link_flow(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ qdr_link_t *link,
+ int credit);
+uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ qdr_link_t *link,
+ qdr_delivery_t *delivery,
+ bool settled);
+void qdr_http1_server_core_delivery_update(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ qdr_http1_request_base_t *hreq,
+ qdr_delivery_t *dlv,
+ uint64_t disp,
+ bool settled);
+
+#endif // http1_private_H
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
new file mode 100644
index 0000000..596f32b
--- /dev/null
+++ b/src/adaptors/http1/http1_server.c
@@ -0,0 +1,1430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "http1_private.h"
+#include "adaptors/adaptor_utils.h"
+
+#include <proton/proactor.h>
+
+//
+// This file contains code specific to HTTP server processing. The raw
+// connection is terminated at an HTTP server, not an HTTP client.
+//
+
+// for debug: dump raw buffers to stdout if true
+#define HTTP1_DUMP_BUFFERS false
+
+
+//
+// State for a single response message arriving via the raw connection. This
+// message will be decoded into a single AMQP message and forwarded into the
+// core.
+//
+// This object is instantiated when the HTTP1 codec indicates the arrival of a
+// response message (See _server_rx_response_cb()). The response is considered
+// "complete" after it has been fully encoded and delivered to the core. The
+// _server_response_msg_t is freed at this point - we do not wait for dispo or
+// settlement from the core since we cannot do anything meaningful should the
+// delivery fail (other than log it).
+//
+typedef struct _server_response_msg_t {
+ DEQ_LINKS(struct _server_response_msg_t);
+
+ struct _server_request_t *hreq; // owning request
+
+ qd_message_t *msg; // hold incoming message
+ qd_composed_field_t *msg_props; // hold incoming headers
+ qdr_delivery_t *dlv; // inbound to router (qdr_link_deliver)
+ bool rx_complete; // response rx complete
+} _server_response_msg_t;
+ALLOC_DECLARE(_server_response_msg_t);
+ALLOC_DEFINE(_server_response_msg_t);
+DEQ_DECLARE(_server_response_msg_t, _server_response_msg_list_t);
+
+
+//
+// State for an HTTP/1.x Request+Response exchange, server facing
+//
+typedef struct _server_request_t {
+ qdr_http1_request_base_t base;
+
+ // The request arrives via the router core in an AMQP message
+ // (qd_message_t). These fields are used to encode the response and send
+ // it out the raw connection.
+ //
+ qdr_delivery_t *request_dlv; // outbound from core_link_deliver
+ uint64_t request_dispo; // set by adaptor during encode
+ bool request_settled; // set by adaptor
+ bool request_acked; // true if dispo sent to core
+ bool request_encoded; // true when encoding done
+ bool headers_encoded; // True when header encode done
+
+ qdr_http1_out_data_fifo_t out_data; // encoded request written to raw conn
+
+ _server_response_msg_list_t responses; // response(s) to this request
+
+ bool codec_completed; // Request and Response HTTP msgs OK
+ bool cancelled;
+ bool close_on_complete; // close the conn when this request is complete
+} _server_request_t;
+ALLOC_DECLARE(_server_request_t);
+ALLOC_DEFINE(_server_request_t);
+
+
+//
+// This file contains code specific to HTTP server processing. The raw
+// connection is terminated at an HTTP server, not an HTTP client.
+//
+
+
+#define DEFAULT_CAPACITY 250
+#define RETRY_PAUSE_MSEC 500
+#define MAX_RECONNECT 5 // 5 * 500 = 2.5 sec
+
+static void _server_tx_buffers_cb(h1_codec_request_state_t *lib_hrs, qd_buffer_list_t *blist, unsigned int len);
+static void _server_tx_body_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_body_data_t *body_data);
+static int _server_rx_request_cb(h1_codec_request_state_t *hrs,
+ const char *method,
+ const char *target,
+ uint32_t version_major,
+ uint32_t version_minor);
+static int _server_rx_response_cb(h1_codec_request_state_t *hrs,
+ int status_code,
+ const char *reason_phrase,
+ uint32_t version_major,
+ uint32_t version_minor);
+static int _server_rx_header_cb(h1_codec_request_state_t *hrs, const char *key, const char *value);
+static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_body);
+static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t offset, size_t len,
+ bool more);
+static void _server_rx_done_cb(h1_codec_request_state_t *hrs);
+static void _server_request_complete_cb(h1_codec_request_state_t *hrs, bool cancelled);
+static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context);
+static void _do_reconnect(void *context);
+static void _server_response_msg_free(_server_request_t *req, _server_response_msg_t *rmsg);
+static void _server_request_free(_server_request_t *req);
+static void _server_connection_free(qdr_http1_connection_t *hconn);
+static void _write_pending_request(_server_request_t *req);
+static void _cancel_request(_server_request_t *req);
+
+
+////////////////////////////////////////////////////////
+// HTTP/1.x Server Connector
+////////////////////////////////////////////////////////
+
+
+// An HttpConnector has been created. Create an qdr_http_connection_t for it.
+// Do not create a raw connection - this is done on demand when the router
+// sends a delivery over the connector.
+//
+static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ctor,
+ qd_dispatch_t *qd,
+ const qd_http_bridge_config_t *bconfig)
+{
+ qdr_http1_connection_t *hconn = new_qdr_http1_connection_t();
+
+ ZERO(hconn);
+ hconn->type = HTTP1_CONN_SERVER;
+ hconn->qd_server = qd->server;
+ hconn->adaptor = qdr_http1_adaptor;
+ hconn->handler_context.handler = &_handle_connection_events;
+ hconn->handler_context.context = hconn;
+ hconn->cfg.host = qd_strdup(bconfig->host);
+ hconn->cfg.port = qd_strdup(bconfig->port);
+ hconn->cfg.address = qd_strdup(bconfig->address);
+ hconn->cfg.host_port = qd_strdup(bconfig->host_port);
+
+ // for initiating a connection to the server
+ hconn->server.reconnect_timer = qd_timer(qdr_http1_adaptor->core->qd, _do_reconnect, hconn);
+
+ // Create the qdr_connection
+
+ qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted,
+ false, //bool is_authenticated,
+ true, //bool opened,
+ "", //char *sasl_mechanisms,
+ QD_OUTGOING, //qd_direction_t dir,
+ hconn->cfg.host_port, //const char *host,
+ "", //const char *ssl_proto,
+ "", //const char *ssl_cipher,
+ "", //const char *user,
+ "HTTP/1.x Adaptor", //const char *container,
+ pn_data(0), //pn_data_t *connection_properties,
+ 0, //int ssl_ssf,
+ false, //bool ssl,
+ // set if remote is a qdrouter
+ 0); //const qdr_router_version_t *version)
+
+ hconn->conn_id = qd_server_allocate_connection_id(hconn->qd_server);
+ hconn->qdr_conn = qdr_connection_opened(qdr_http1_adaptor->core,
+ qdr_http1_adaptor->adaptor,
+ false, // incoming
+ QDR_ROLE_NORMAL,
+ 1, // cost
+ hconn->conn_id,
+ 0, // label
+ 0, // remote container id
+ false, // strip annotations in
+ false, // strip annotations out
+ false, // allow dynamic link routes
+ false, // allow admin status update
+ DEFAULT_CAPACITY,
+ 0, // vhost
+ info,
+ 0, // bind context
+ 0); // bind token
+ qdr_connection_set_context(hconn->qdr_conn, hconn);
+
+ qd_log(hconn->adaptor->log, QD_LOG_DEBUG, "[C%i] HTTP connection to server created", hconn->conn_id);
+
+ // wait for the raw connection to come up before creating the in and out links
+
+ hconn->raw_conn = pn_raw_connection();
+ pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
+
+ sys_mutex_lock(qdr_http1_adaptor->lock);
+ DEQ_INSERT_TAIL(qdr_http1_adaptor->connections, hconn);
+ sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+ return hconn;
+}
+
+
+// Management Agent API - Create
+//
+qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
+{
+ qd_http_connector_t *c = qd_http_connector(qd->server);
+ if (!c) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR, "Unable to create http connector: no memory");
+ return 0;
+ }
+
+ DEQ_ITEM_INIT(c);
+ qdr_http1_connection_t *hconn = _create_server_connection(c, qd, config);
+ if (hconn) {
+ sys_mutex_lock(qdr_http1_adaptor->lock);
+ DEQ_INSERT_TAIL(qdr_http1_adaptor->connectors, c);
+ sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+ // activate the raw connection. This connection may be scheduled on
+ // another thread by this call:
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"] Initiating connection to HTTP server %s",
+ hconn->conn_id, hconn->cfg.host_port);
+ pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
+ return c;
+ } else {
+ qd_http_connector_decref(c);
+ c = 0;
+ }
+
+ return c;
+}
+
+
+// Management Agent API - Delete
+//
+void qd_http1_delete_connector(qd_dispatch_t *ignored, qd_http_connector_t *ct)
+{
+ if (ct) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Deleted HttpConnector for %s, %s:%s", ct->config.address, ct->config.host, ct->config.port);
+
+ sys_mutex_lock(qdr_http1_adaptor->lock);
+ DEQ_REMOVE(qdr_http1_adaptor->connectors, ct);
+ sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+ qd_http_connector_decref(ct);
+
+ // TODO(kgiusti): do we now close all related connections?
+ }
+}
+
+
+
+
+////////////////////////////////////////////////////////
+// Raw Connector Events
+////////////////////////////////////////////////////////
+
+
+// Create the qdr links and HTTP codec when the server connection comes up.
+// These links & codec will persist across temporary drops in the connection to
+// the server (like when closing the connection to indicate end of response
+// message). However if the connection to the server cannot be re-established
+// in a "reasonable" amount of time we consider the server unavailable and
+// these links and codec will be closed - aborting any pending requests. Once
+// the connection to the server is reestablished these links & codec will be
+// recreated.
+//
+static void _setup_server_links(qdr_http1_connection_t *hconn)
+{
+ if (!hconn->in_link) {
+ // simulate an anonymous link for responses from the server
+ hconn->in_link = qdr_link_first_attach(hconn->qdr_conn,
+ QD_INCOMING,
+ qdr_terminus(0), //qdr_terminus_t *source,
+ qdr_terminus(0), //qdr_terminus_t *target
+ "http1.server.in", //const char *name,
+ 0, //const char *terminus_addr,
+ false,
+ NULL,
+ &(hconn->in_link_id));
+ qdr_link_set_context(hconn->in_link, hconn);
+
+ qd_log(hconn->adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP server response link created",
+ hconn->conn_id, hconn->in_link_id);
+ }
+
+ if (!hconn->out_link) {
+ // simulate a server subscription for its service address
+ qdr_terminus_t *source = qdr_terminus(0);
+ qdr_terminus_set_address(source, hconn->cfg.address);
+ hconn->out_link = qdr_link_first_attach(hconn->qdr_conn,
+ QD_OUTGOING,
+ source, //qdr_terminus_t *source,
+ qdr_terminus(0), //qdr_terminus_t *target,
+ "http1.server.out", //const char *name,
+ 0, //const char *terminus_addr,
+ false,
+ 0, // initial delivery
+ &(hconn->out_link_id));
+ qdr_link_set_context(hconn->out_link, hconn);
+
+ hconn->out_link_credit = DEFAULT_CAPACITY;
+ qdr_link_flow(hconn->adaptor->core, hconn->out_link, DEFAULT_CAPACITY, false);
+
+ qd_log(hconn->adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP server request link created",
+ hconn->conn_id, hconn->out_link_id);
+ }
+
+ if (!hconn->http_conn) {
+ h1_codec_config_t config = {0};
+ config.type = HTTP1_CONN_SERVER;
+ config.tx_buffers = _server_tx_buffers_cb;
+ config.tx_body_data = _server_tx_body_data_cb;
+ config.rx_request = _server_rx_request_cb;
+ config.rx_response = _server_rx_response_cb;
+ config.rx_header = _server_rx_header_cb;
+ config.rx_headers_done = _server_rx_headers_done_cb;
+ config.rx_body = _server_rx_body_cb;
+ config.rx_done = _server_rx_done_cb;
+ config.request_complete = _server_request_complete_cb;
+ hconn->http_conn = h1_codec_connection(&config, hconn);
+ }
+}
+
+
+// Tear down the qdr links and the codec. This is called when the
+// connection to the server has dropped and cannot be re-established in a
+// timely manner.
+//
+static void _teardown_server_links(qdr_http1_connection_t *hconn)
+{
+ // @TODO(kgiusti): should we PN_RELEASE all unsent outbound deliveries first?
+ _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+ while (hreq) {
+ _server_request_free(hreq);
+ hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+ }
+ h1_codec_connection_free(hconn->http_conn);
+ hconn->http_conn = 0;
+
+ if (hconn->out_link) {
+ qdr_link_set_context(hconn->out_link, 0);
+ qdr_link_detach(hconn->out_link, QD_CLOSED, 0);
+ hconn->out_link = 0;
+ }
+
+ if (hconn->in_link) {
+ qdr_link_set_context(hconn->in_link, 0);
+ qdr_link_detach(hconn->in_link, QD_CLOSED, 0);
+ hconn->in_link = 0;
+ }
+}
+
+
+// This adapter attempts to keep the connection to the server up as long as the
+// connector is configured. This is called via a timer scheduled when the
+// PN_CONNECTION_CLOSE event is handled.
+//
+static void _do_reconnect(void *context)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
+ if (!hconn->raw_conn) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] Connecting to HTTP server...", hconn->conn_id);
+ hconn->raw_conn = pn_raw_connection();
+ pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
+ // this call may reschedule the connection on another I/O thread:
+ pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
+ }
+}
+
+
+// Proton Raw Connection Events
+//
+static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context)
+{
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
+ qd_log_source_t *log = qdr_http1_adaptor->log;
+
+ qd_log(log, QD_LOG_DEBUG, "RAW CONNECTION EVENT %s\n", pn_event_type_name(pn_event_type(e)));
+
+ if (!hconn) return;
+
+ switch (pn_event_type(e)) {
+
+ case PN_RAW_CONNECTION_CONNECTED: {
+ hconn->server.reconnect_count = 0;
+ _setup_server_links(hconn);
+ while (qdr_connection_process(hconn->qdr_conn)) {}
+ break;
+ }
+ case PN_RAW_CONNECTION_CLOSED_READ: {
+ // notify the codec so it can complete the current response
+ // message (response body terminated on connection closed)
+ h1_codec_connection_closed(hconn->http_conn);
+ }
+ // fall through
+ case PN_RAW_CONNECTION_CLOSED_WRITE: {
+ qd_log(log, QD_LOG_DEBUG, "[C%i] Closed for %s", hconn->conn_id,
+ pn_event_type(e) == PN_RAW_CONNECTION_CLOSED_READ
+ ? "reading" : "writing");
+ pn_raw_connection_close(hconn->raw_conn);
+ break;
+ }
+ case PN_RAW_CONNECTION_DISCONNECTED: {
+ pn_raw_connection_set_context(hconn->raw_conn, 0);
+ hconn->raw_conn = 0;
+ hconn->close_connection = false;
+
+ if (!hconn->qdr_conn) {
+ // the router has closed this connection so do not try to
+ // re-establish it
+ qd_log(log, QD_LOG_INFO, "[C%i] Connection closed", hconn->conn_id);
+ _server_connection_free(hconn);
+ return;
+ }
+
+ // if the current request was not completed, cancel it. it's ok if
+ // there are outstanding *response* deliveries in flight as long as the
+ // response(s) have been completely received from the server
+ // (request_complete == true).
+
+ _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+ if (hreq && !hreq->codec_completed && hreq->base.out_http1_octets > 0) {
+ _cancel_request(hreq);
+ }
+
+ // reconnect to the server. Leave the links intact so pending requests
+ // are not aborted. Once we've failed to reconnect after MAX_RECONNECT
+ // tries drop the links to prevent additional request from arriving.
+ //
+ qd_duration_t nap_time = RETRY_PAUSE_MSEC * hconn->server.reconnect_count;
+ if (hconn->server.reconnect_count == MAX_RECONNECT) {
+ qd_log(log, QD_LOG_INFO, "[C%i] Server not responding - disconnecting...", hconn->conn_id);
+ _teardown_server_links(hconn);
+ } else {
+ hconn->server.reconnect_count += 1; // increase next sleep interval
+ }
+ qd_timer_schedule(hconn->server.reconnect_timer, nap_time);
+ return;
+ }
+ case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need write buffers", hconn->conn_id);
+ _write_pending_request((_server_request_t*) DEQ_HEAD(hconn->requests));
+ break;
+ }
+ case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
+ qd_log(log, QD_LOG_DEBUG, "[C%i] Need read buffers", hconn->conn_id);
+ // @TODO(kgiusti): backpressure if no credit
+ // if (hconn->in_link_credit > 0 */)
+ if (!hconn->close_connection) {
+ int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
+ hconn->conn_id, granted);
+ }
+ break;
+ }
+ case PN_RAW_CONNECTION_WAKE: {
+ qd_log(log, QD_LOG_DEBUG, "[C%i] Wake-up", hconn->conn_id);
+ while (qdr_connection_process(hconn->qdr_conn)) {}
+ qd_log(log, QD_LOG_DEBUG, "[C%i] Connection processing complete", hconn->conn_id);
+ break;
+ }
+ case PN_RAW_CONNECTION_READ: {
+ qd_buffer_list_t blist;
+ uintmax_t length;
+ qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
+
+ if (HTTP1_DUMP_BUFFERS) {
+ fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total octets\n", length);
+ qd_buffer_t *bb = DEQ_HEAD(blist);
+ while (bb) {
+ fprintf(stdout, " buffer='%.*s'\n", (int)qd_buffer_size(bb), (char*)&bb[1]);
+ bb = DEQ_NEXT(bb);
+ }
+ fflush(stdout);
+ }
+
+ if (length) {
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from server",
+ hconn->conn_id, hconn->in_link_id, length);
+ hconn->in_http1_octets += length;
+ int error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
+ if (error)
+ qdr_http1_close_connection(hconn, "Incoming response message failed to parse");
+ }
+ break;
+ }
+ case PN_RAW_CONNECTION_WRITTEN: {
+ qdr_http1_free_written_buffers(hconn);
+ break;
+ }
+ default:
+ break;
+ }
+
+ // remove me:
+ if (hconn) {
+ _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+ if (hreq) {
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP is server request complete????", hconn->conn_id);
+ qd_log(log, QD_LOG_DEBUG, " codec_completed=%s cancelled=%s",
+ hreq->codec_completed ? "Complete" : "Not Complete",
+ hreq->cancelled ? "Cancelled" : "Not Cancelled");
+ qd_log(log, QD_LOG_DEBUG, " Req: dlv=%p dispo=%"PRIu64" settled=%d acked=%d",
+ (void*) hreq->request_dlv, hreq->request_dispo, hreq->request_settled,
+ hreq->request_acked);
+ qd_log(log, QD_LOG_DEBUG, " Req: out_data=%d pton=%d resp-count=%d",
+ (int) DEQ_SIZE(hreq->out_data.fifo),
+ qdr_http1_out_data_buffers_outstanding(&hreq->out_data),
+ (int) DEQ_SIZE(hreq->responses));
+ }
+ }
+
+ // Check for completed or cancelled requests
+
+ bool need_close = false;
+ _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+ if (hreq) {
+
+ if (hreq->cancelled) {
+
+ // request: have to wait until all buffers returned from proton
+ // before we can release the request delivery...
+ if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
+ return;
+
+ if (hreq->request_dlv) {
+ // let the message drain... (TODO@(kgiusti) is this necessary?
+ if (!qdr_delivery_receive_complete(hreq->request_dlv))
+ return;
+
+ uint64_t dispo = hreq->request_dispo || PN_MODIFIED;
+ qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+ hreq->request_dlv,
+ dispo,
+ true, // settled
+ 0, // error
+ 0, // dispo data
+ false);
+ qdr_delivery_set_context(hreq->request_dlv, 0);
+ qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request cancelled");
+ hreq->request_dlv = 0;
+ }
+
+ _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+ while (rmsg) {
+ if (rmsg->dlv) {
+ qd_message_set_receive_complete(qdr_delivery_message(rmsg->dlv));
+ qdr_delivery_set_aborted(rmsg->dlv, true);
+ }
+ _server_response_msg_free(hreq, rmsg);
+ rmsg = DEQ_HEAD(hreq->responses);
+ }
+
+ // The state of the connection to the server will be unknown if
+ // this request was not completed.
+ if (!hreq->codec_completed && hreq->base.out_http1_octets > 0)
+ need_close = true;
+
+ _server_request_free(hreq);
+
+ } else {
+
+ // Can the request disposition be updated? Disposition can be
+ // updated after the entire encoded request has been written to the
+ // server.
+ if (!hreq->request_acked &&
+ hreq->request_encoded &&
+ DEQ_SIZE(hreq->out_data.fifo) == 0) {
+
+ qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+ hreq->request_dlv,
+ hreq->request_dispo,
+ false, // settled
+ 0, // error
+ 0, // dispo data
+ false);
+ hreq->request_acked = true;
+ }
+
+ // Can we settle request? Settle the request delivery after all
+ // response messages have been received from the server
+ // (codec_complete). Note that the responses may not have finished
+ // being delivered to the core (lack of credit, etc.)
+ //
+ if (!hreq->request_settled &&
+ hreq->request_acked && // implies out_data done
+ hreq->codec_completed) {
+
+ qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+ hreq->request_dlv,
+ hreq->request_dispo,
+ true, // settled
+ 0, // error
+ 0, // dispo data
+ false);
+ // can now release the delivery
+ qdr_delivery_set_context(hreq->request_dlv, 0);
+ qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request settled");
+ hreq->request_dlv = 0;
+
+ hreq->request_settled = true;
+ }
+
+ // Has the entire request/response completed? It is complete after
+ // the request message has been settled and all responses have been
+ // delivered to the core.
+ //
+ if (hreq->request_acked &&
+ hreq->request_settled &&
+ DEQ_SIZE(hreq->responses) == 0) {
+
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request completed!", hconn->conn_id);
+ _server_request_free(hreq);
+
+ hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+ if (hreq)
+ _write_pending_request(hreq);
+ }
+ }
+ }
+
+ if (need_close) {
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closing connection!", hconn->conn_id);
+ qdr_http1_close_connection(hconn, "Request cancelled");
+ }
+}
+
+
+//////////////////////////////////////////////////////////////////////
+// HTTP/1.x Encoder/Decoder Callbacks
+//////////////////////////////////////////////////////////////////////
+
+
+// Encoder has a buffer list to send to the server
+//
+static void _server_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *blist, unsigned int len)
+{
+ _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Sending %u octets to server",
+ hconn->conn_id, hconn->out_link_id, len);
+ qdr_http1_enqueue_buffer_list(&hreq->out_data, blist);
+ if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests)) {
+ _write_pending_request(hreq);
+ }
+}
+
+
+// Encoder has body data to send to the server
+//
+static void _server_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data)
+{
+ _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Sending body data to server",
+ hconn->conn_id, hconn->out_link_id);
+ qdr_http1_enqueue_body_data(&hreq->out_data, body_data);
+ if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests) && hconn->raw_conn) {
+ _write_pending_request(hreq);
+ }
+}
+
+
+// Server will not be sending us HTTP requests
+//
+static int _server_rx_request_cb(h1_codec_request_state_t *hrs,
+ const char *method,
+ const char *target,
+ uint32_t version_major,
+ uint32_t version_minor)
+{
+ _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR,
+ "[C%"PRIu64"][L%"PRIu64"] Spurious HTTP request received from server",
+ hconn->conn_id, hconn->in_link_id);
+ return HTTP1_STATUS_BAD_REQ;
+}
+
+
+// called when decoding an HTTP response from the server.
+//
+static int _server_rx_response_cb(h1_codec_request_state_t *hrs,
+ int status_code,
+ const char *reason_phrase,
+ uint32_t version_major,
+ uint32_t version_minor)
+{
+ _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+ // expected to be in-order
+ assert(hreq && hreq == (_server_request_t*) DEQ_HEAD(hconn->requests));
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP response received: status=%d phrase=%s version=%"PRIi32".%"PRIi32,
+ hconn->conn_id, hconn->in_link_id, status_code, reason_phrase ? reason_phrase : "<NONE>",
+ version_major, version_minor);
+
+ _server_response_msg_t *rmsg = new__server_response_msg_t();
+ ZERO(rmsg);
+ rmsg->hreq = hreq;
+ DEQ_INSERT_TAIL(hreq->responses, rmsg);
+
+ rmsg->msg_props = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
+ qd_compose_start_map(rmsg->msg_props);
+ {
+ char version[64];
+ snprintf(version, 64, "%"PRIi32".%"PRIi32, version_major, version_minor);
+ qd_compose_insert_symbol(rmsg->msg_props, RESPONSE_HEADER_KEY);
+ qd_compose_insert_string(rmsg->msg_props, version);
+
+ qd_compose_insert_symbol(rmsg->msg_props, STATUS_HEADER_KEY);
+ qd_compose_insert_int(rmsg->msg_props, (int32_t)status_code);
+
+ if (reason_phrase) {
+ qd_compose_insert_symbol(rmsg->msg_props, REASON_HEADER_KEY);
+ qd_compose_insert_string(rmsg->msg_props, reason_phrase);
+ }
+ }
+
+ return 0;
+}
+
+
+// called for each decoded HTTP header.
+//
+static int _server_rx_header_cb(h1_codec_request_state_t *hrs, const char *key, const char *value)
+{
+ _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"]L%"PRIu64"] HTTP response header received: key='%s' value='%s'",
+ hconn->conn_id, hconn->in_link_id, key, value);
+
+ // expect: running incoming request at tail
+ _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
+ assert(rmsg);
+
+ // We need to filter the connection header out
+ // @TODO(kgiusti): also have to remove headers given in value!
+ if (strcasecmp(key, "connection") != 0) {
+ qd_compose_insert_symbol(rmsg->msg_props, key);
+ qd_compose_insert_string(rmsg->msg_props, value);
+ }
+
+ return 0;
+}
+
+
+// called after the last header is decoded, before decoding any body data.
+//
+static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_body)
+{
+ _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64" HTTP response headers done.",
+ hconn->conn_id, hconn->in_link_id);
+
+ // expect: running incoming request at tail
+ _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
+ assert(rmsg && !rmsg->msg);
+
+ // start building the AMQP message
+
+ rmsg->msg = qd_message();
+
+ qd_composed_field_t *hdrs = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+ qd_compose_start_list(hdrs);
+ qd_compose_insert_bool(hdrs, 0); // durable
+ qd_compose_insert_null(hdrs); // priority
+ //qd_compose_insert_null(hdrs); // ttl
+ //qd_compose_insert_bool(hdrs, 0); // first-acquirer
+ //qd_compose_insert_uint(hdrs, 0); // delivery-count
+ qd_compose_end_list(hdrs);
+
+ qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, hdrs);
+ qd_compose_start_list(props);
+ qd_compose_insert_null(props); // message-id
+ qd_compose_insert_null(props); // user-id
+ qd_compose_insert_string(props, hreq->base.response_addr); // to
+ // subject:
+ qd_compose_insert_string(props, h1_codec_request_state_method(hrs));
+ qd_compose_insert_null(props); // reply-to
+ qd_compose_insert_ulong(props, hreq->base.msg_id); // correlation-id
+ qd_compose_end_list(props);
+
+ qd_compose_end_map(rmsg->msg_props);
+
+ if (!has_body) {
+ // @TODO(kgiusti): fixme: tack on an empty body data performative. The
+ // message decoder will barf otherwise
+ qd_buffer_list_t empty = DEQ_EMPTY;
+ rmsg->msg_props = qd_compose(QD_PERFORMATIVE_BODY_DATA, rmsg->msg_props);
+ qd_compose_insert_binary_buffers(rmsg->msg_props, &empty);
+ }
+
+ qd_message_compose_3(rmsg->msg, props, rmsg->msg_props, !has_body);
+ qd_compose_free(props);
+ qd_compose_free(rmsg->msg_props);
+ rmsg->msg_props = 0;
+
+ // start delivery if possible
+ if (hconn->in_link_credit > 0 && rmsg == DEQ_HEAD(hreq->responses)) {
+ hconn->in_link_credit -= 1;
+
+ qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Delivering response to router addr=%s",
+ hconn->conn_id, hconn->in_link_id, hreq->base.response_addr);
+
+ qd_iterator_t *addr = qd_message_field_iterator(rmsg->msg, QD_FIELD_TO);
+ assert(addr);
+ qd_iterator_reset_view(addr, ITER_VIEW_ADDRESS_HASH);
+ rmsg->dlv = qdr_link_deliver_to(hconn->in_link, rmsg->msg, 0, addr, false, 0, 0, 0, 0);
+ qdr_delivery_set_context(rmsg->dlv, (void*) hreq);
+ qdr_delivery_incref(rmsg->dlv, "referenced by HTTP1 adaptor");
+ rmsg->msg = 0; // now owned by delivery
+ }
+
+ return 0;
+}
+
+
+// Called with decoded body data. This may be called multiple times as body
+// data becomes available.
+//
+static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t offset, size_t len,
+ bool more)
+{
+ _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+ _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
+
+ qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv);
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP response body received len=%zu.",
+ hconn->conn_id, hconn->in_link_id, len);
+
+ if (offset) {
+ // dispatch assumes all body data starts at the buffer base so it cannot deal with offsets.
+ // Remove the offset by shifting the content of the head buffer forward
+ //
+ qd_buffer_t *head = DEQ_HEAD(*body);
+ memmove(qd_buffer_base(head), qd_buffer_base(head) + offset, qd_buffer_size(head) - offset);
+ head->size -= offset;
+ }
+
+ //
+ // Compose a DATA performative for this section of the stream
+ //
+ qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+ qd_compose_insert_binary_buffers(field, body);
+
+ //
+ // Extend the streaming message and free the composed field
+ //
+ qd_message_extend(msg, field);
+ qd_compose_free(field);
+
+ //
+ // Notify the router that more data is ready to be pushed out on the delivery
+ //
+ if (!more)
+ qd_message_set_receive_complete(msg);
+
+ if (rmsg->dlv)
+ qdr_delivery_continue(qdr_http1_adaptor->core, rmsg->dlv, false);
+
+ return 0;
+}
+
+// Called at the completion of response decoding.
+//
+static void _server_rx_done_cb(h1_codec_request_state_t *hrs)
+{
+ _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+ _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
+
+ qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv);
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP response receive complete.",
+ hconn->conn_id, hconn->in_link_id);
+
+ rmsg->rx_complete = true;
+
+ if (!qd_message_receive_complete(msg)) {
+ qd_message_set_receive_complete(msg);
+ if (rmsg->dlv) {
+ qdr_delivery_continue(qdr_http1_adaptor->core, rmsg->dlv, false);
+ }
+ }
+
+ if (rmsg->dlv) {
+ // We've finished the delivery, and don't care about outcome/settlement
+ _server_response_msg_free(hreq, rmsg);
+ }
+}
+
+
+// called at the completion of a full Request/Response exchange, or as a result
+// of cancelling the request. The hrs will be deleted on return from this
+// call. Any hrs related state must be released before returning from this
+// callback.
+//
+// Note: in the case where the request had multiple response messages, this
+// call occurs when the LAST response has been completely received
+// (_server_rx_done_cb())
+//
+static void _server_request_complete_cb(h1_codec_request_state_t *hrs, bool cancelled)
+{
+ _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+
+ hreq->base.lib_rs = 0;
+ hreq->cancelled = hreq->cancelled || cancelled;
+ hreq->codec_completed = !hreq->cancelled;
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"] HTTP request/response %s.", hconn->conn_id,
+ cancelled ? "cancelled!" : "codec done");
+}
+
+
+//////////////////////////////////////////////////////////////////////
+// Router Protocol Adapter Callbacks
+//////////////////////////////////////////////////////////////////////
+
+
+// credit has been granted - responses may now be sent to the
+// router core.
+//
+void qdr_http1_server_core_link_flow(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ qdr_link_t *link,
+ int credit)
+{
+ assert(link == hconn->in_link); // router only grants flow on incoming link
+
+ assert(qdr_link_is_anonymous(link)); // remove me
+ hconn->in_link_credit += credit;
+
+ qd_log(adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Credit granted on response link: %d",
+ hconn->conn_id, hconn->in_link_id, hconn->in_link_credit);
+
+ if (hconn->in_link_credit > 0) {
+
+ if (hconn->raw_conn)
+ qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+
+ // check for pending responses that are blocked for credit
+
+ _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+ if (hreq) {
+ _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+ while (rmsg && rmsg->msg && hconn->in_link_credit > 0) {
+ assert(!rmsg->dlv);
+ hconn->in_link_credit -= 1;
+
+ qd_log(adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Delivering blocked response to router addr=%s",
+ hconn->conn_id, hconn->in_link_id, hreq->base.response_addr);
+
+ qd_iterator_t *addr = qd_message_field_iterator(rmsg->msg, QD_FIELD_TO);
+ qd_iterator_reset_view(addr, ITER_VIEW_ADDRESS_HASH);
+ qdr_delivery_t *dlv = qdr_link_deliver_to(hconn->in_link, rmsg->msg, 0, addr, false, 0, 0, 0, 0);
+ if (!rmsg->rx_complete) {
+ // stop here since response must be complete before we can deliver the next one.
+ rmsg->dlv = dlv;
+ qdr_delivery_set_context(rmsg->dlv, (void*) hreq);
+ qdr_delivery_incref(rmsg->dlv, "referenced by HTTP1 adaptor");
+ rmsg->msg = 0;
+ break;
+ }
+
+ // the delivery is complete no need to save it
+ _server_response_msg_free(hreq, rmsg);
+ rmsg = DEQ_HEAD(hreq->responses);
+ }
+ }
+ }
+}
+
+
+// Handle disposition/settlement update for the outstanding HTTP response.
+//
+void qdr_http1_server_core_delivery_update(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ qdr_http1_request_base_t *hbase,
+ qdr_delivery_t *dlv,
+ uint64_t disp,
+ bool settled)
+{
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP response delivery update, outcome=0x%"PRIx64"%s",
+ hconn->conn_id, hconn->in_link_id, disp, settled ? " settled": "");
+
+ // Not much can be done with error dispositions (I think)
+ if (disp != PN_ACCEPTED) {
+ qd_log(adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] response message not received, outcome=0x%"PRIx64,
+ hconn->conn_id, hconn->in_link_id, disp);
+ }
+}
+
+
+//
+// Request message forwarding
+//
+
+
+// Create a request context for a new request in msg, which is valid to a depth
+// of at least QD_DEPTH_PROPERTIES
+//
+static _server_request_t *_create_request_context(qdr_http1_connection_t *hconn, qd_message_t *msg)
+{
+ uint64_t msg_id = 0;
+ char *reply_to = 0;
+ bool ok = false;
+ qd_parsed_field_t *msg_id_pf = 0;
+
+ qd_iterator_t *msg_id_itr = qd_message_field_iterator_typed(msg, QD_FIELD_MESSAGE_ID); // ulong
+ if (msg_id_itr) {
+ msg_id_pf = qd_parse(msg_id_itr);
+ if (msg_id_pf && qd_parse_ok(msg_id_pf)) {
+ msg_id = qd_parse_as_ulong(msg_id_pf);
+ ok = qd_parse_ok(msg_id_pf);
+ }
+ }
+ qd_parse_free(msg_id_pf);
+ qd_iterator_free(msg_id_itr);
+
+ if (!ok) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] Rejecting message missing id.",
+ hconn->conn_id, hconn->out_link_id);
+ return 0;
+ }
+
+ qd_iterator_t *reply_to_itr = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO);
+ reply_to = (char*) qd_iterator_copy(reply_to_itr);
+ qd_iterator_free(reply_to_itr);
+
+ assert(reply_to && strlen(reply_to)); // remove me
+ if (!reply_to) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] Rejecting message no reply-to.",
+ hconn->conn_id, hconn->out_link_id);
+ return 0;
+ }
+
+ _server_request_t *hreq = new__server_request_t();
+ ZERO(hreq);
+ hreq->base.hconn = hconn;
+ hreq->base.msg_id = msg_id;
+ hreq->base.response_addr = reply_to;
+ DEQ_INIT(hreq->out_data.fifo);
+ DEQ_INIT(hreq->responses);
+ DEQ_INSERT_TAIL(hconn->requests, &hreq->base);
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] New HTTP Request msg_id=%"PRIu64" reply-to=%s.",
+ hconn->conn_id, hconn->out_link_id, msg_id, reply_to);
+ return hreq;
+}
+
+
+// Start a new request to the server. msg has been validated to at least
+// application properties depth. Returns 0 on success.
+//
+static uint64_t _send_request_headers(_server_request_t *hreq, qd_message_t *msg)
+{
+ // start encoding HTTP request. Need method, target and version
+
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+ char *method_str = 0;
+ char *target_str = 0;
+ qd_parsed_field_t *app_props = 0;
+ uint32_t major = 1;
+ uint32_t minor = 1;
+ uint64_t outcome = 0;
+
+ assert(!hreq->base.lib_rs);
+
+ // method is passed in the SUBJECT field
+ qd_iterator_t *method_iter = qd_message_field_iterator(msg, QD_FIELD_SUBJECT);
+ if (!method_iter) {
+ return PN_REJECTED;
+ }
+
+ method_str = (char*) qd_iterator_copy(method_iter);
+ qd_iterator_free(method_iter);
+ if (!method_str) {
+ return PN_REJECTED;
+ }
+
+ // target, version info and other headers are in the app properties
+ qd_iterator_t *app_props_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
+ if (!app_props_iter) {
+ outcome = PN_REJECTED;
+ goto exit;
+ }
+
+ app_props = qd_parse(app_props_iter);
+ qd_iterator_free(app_props_iter);
+ if (!app_props) {
+ outcome = PN_REJECTED;
+ goto exit;
+ }
+
+ qd_parsed_field_t *ref = qd_parse_value_by_key(app_props, TARGET_HEADER_KEY);
+ target_str = (char*) qd_iterator_copy(qd_parse_raw(ref));
+ if (!target_str) {
+ outcome = PN_REJECTED;
+ goto exit;
+ }
+
+
+ // Pull the version info from the app properties (e.g. "1.1")
+ ref = qd_parse_value_by_key(app_props, REQUEST_HEADER_KEY);
+ if (ref) { // optional
+ char *version_str = (char*) qd_iterator_copy(qd_parse_raw(ref));
+ if (version_str)
+ sscanf(version_str, "%"SCNu32".%"SCNu32, &major, &minor);
+ free(version_str);
+ }
+
+ // done copying and converting!
+
+ qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Encoding request method=%s target=%s",
+ hconn->conn_id, hconn->out_link_id, method_str, target_str);
+
+ hreq->base.lib_rs = h1_codec_tx_request(hconn->http_conn, method_str, target_str, major, minor);
+ if (!hreq->base.lib_rs) {
+ outcome = PN_REJECTED;
+ goto exit;
+ }
+
+ h1_codec_request_state_set_context(hreq->base.lib_rs, (void*) hreq);
+
+ // now send all headers in app properties
+ qd_parsed_field_t *key = qd_field_first_child(app_props);
+ bool ok = true;
+ while (ok && key) {
+ qd_parsed_field_t *value = qd_field_next_child(key);
+ if (!value)
+ break;
+
+ qd_iterator_t *i_key = qd_parse_raw(key);
+ if (!i_key)
+ break;
+
+ // ignore the special headers added by the mapping
+ if (!qd_iterator_prefix(i_key, HTTP1_HEADER_PREFIX)) {
+ qd_iterator_t *i_value = qd_parse_raw(value);
+ if (!i_value)
+ break;
+
+ char *header_key = (char*) qd_iterator_copy(i_key);
+ char *header_value = (char*) qd_iterator_copy(i_value);
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Encoding request header %s:%s",
+ hconn->conn_id, hconn->out_link_id,
+ header_key, header_value);
+
+ ok = !h1_codec_tx_add_header(hreq->base.lib_rs, header_key, header_value);
+
+ free(header_key);
+ free(header_value);
+ }
+
+ key = qd_field_next_child(value);
+ }
+
+ if (!ok)
+ outcome = PN_REJECTED;
+
+exit:
+
+ free(method_str);
+ free(target_str);
+ qd_parse_free(app_props);
+
+ return outcome;
+}
+
+
+// Encode an outbound AMQP message as an HTTP Request. Returns PN_ACCEPTED
+// when complete, 0 if incomplete and PN_REJECTED if encoding error.
+//
+static uint64_t _encode_request_message(_server_request_t *hreq)
+{
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+ qd_message_t *msg = qdr_delivery_message(hreq->request_dlv);
+ qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_BODY);
+
+ if (status == QD_MESSAGE_DEPTH_INCOMPLETE)
+ return 0;
+
+ if (status == QD_MESSAGE_DEPTH_INVALID) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] body data depth check failed",
+ hconn->conn_id, hconn->out_link_id);
+ return PN_REJECTED;
+ }
+
+ assert(status == QD_MESSAGE_DEPTH_OK);
+
+ if (!hreq->headers_encoded) {
+ uint64_t outcome = _send_request_headers(hreq, msg);
+ if (outcome) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] Rejecting malformed message.", hconn->conn_id, hconn->out_link_id);
+ return outcome;
+ }
+ hreq->headers_encoded = true;
+ }
+
+ qd_message_body_data_t *body_data = 0;
+
+ while (true) {
+ switch (qd_message_next_body_data(msg, &body_data)) {
+ case QD_MESSAGE_BODY_DATA_OK: {
+
+ qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Encoding request body data",
+ hconn->conn_id, hconn->out_link_id);
+
+ if (h1_codec_tx_body(hreq->base.lib_rs, body_data)) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] body data encode failed",
+ hconn->conn_id, hconn->out_link_id);
+ return PN_REJECTED;
+ }
+ break;
+ }
+
+ case QD_MESSAGE_BODY_DATA_NO_MORE:
+ // indicate this message is complete
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] request message encoding completed",
+ hconn->conn_id, hconn->out_link_id);
+ return PN_ACCEPTED;
+
+ case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] body data need more",
+ hconn->conn_id, hconn->out_link_id);
+ return 0; // wait for more
+
+ case QD_MESSAGE_BODY_DATA_INVALID:
+ case QD_MESSAGE_BODY_DATA_NOT_DATA:
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] Rejecting corrupted body data.",
+ hconn->conn_id, hconn->out_link_id);
+ return PN_REJECTED;
+ }
+ }
+}
+
+
+// The router wants to send this delivery out the link. This is either the
+// start of a new incoming HTTP request or the continuation of an existing one.
+// Note: returning a non-zero value will cause the delivery to be settled!
+//
+uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ qdr_link_t *link,
+ qdr_delivery_t *delivery,
+ bool settled)
+{
+ qd_message_t *msg = qdr_delivery_message(delivery);
+ _server_request_t *hreq = (_server_request_t*) qdr_delivery_get_context(delivery);
+
+ if (!hreq) {
+ // new delivery - create new request:
+ switch (qd_message_check_depth(msg, QD_DEPTH_PROPERTIES)) {
+ case QD_MESSAGE_DEPTH_INCOMPLETE:
+ return 0;
+
+ case QD_MESSAGE_DEPTH_INVALID:
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] Malformed HTTP/1.x message",
+ hconn->conn_id, link->identity);
+ qd_message_set_send_complete(msg);
+ qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
+ return PN_REJECTED;
+
+ case QD_MESSAGE_DEPTH_OK:
+ hreq = _create_request_context(hconn, msg);
+ if (!hreq) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] Discarding malformed message.", hconn->conn_id, link->identity);
+ qd_message_set_send_complete(msg);
+ qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
+ return PN_REJECTED;
+ }
+
+ hreq->request_dlv = delivery;
+ qdr_delivery_set_context(delivery, (void*) hreq);
+ qdr_delivery_incref(delivery, "referenced by HTTP1 adaptor");
+ break;
+ }
+ }
+
+ if (!hreq->request_dispo)
+ hreq->request_dispo = _encode_request_message(hreq);
+
+ if (hreq->request_dispo && qd_message_receive_complete(msg)) {
+
+ qd_message_set_send_complete(msg);
+ qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
+
+ if (hreq->request_dispo == PN_ACCEPTED) {
+ hreq->request_encoded = true;
+ h1_codec_tx_done(hreq->base.lib_rs, &hreq->close_on_complete);
+
+ } else {
+ // mapping to HTTP request failed:
+ _cancel_request(hreq);
+ }
+ }
+
+ return 0;
+}
+
+
+//
+// Misc
+//
+
+// free the response message
+//
+static void _server_response_msg_free(_server_request_t *hreq, _server_response_msg_t *rmsg)
+{
+ DEQ_REMOVE(hreq->responses, rmsg);
+ qd_message_free(rmsg->msg);
+ qd_compose_free(rmsg->msg_props);
+ if (rmsg->dlv) {
+ qdr_delivery_set_context(rmsg->dlv, 0);
+ qdr_delivery_decref(qdr_http1_adaptor->core, rmsg->dlv, "HTTP1 adaptor response freed");
+ }
+ free__server_response_msg_t(rmsg);
+}
+
+
+// Release the request
+//
+static void _server_request_free(_server_request_t *hreq)
+{
+ if (hreq) {
+ qdr_http1_request_base_cleanup(&hreq->base);
+ if (hreq->request_dlv) {
+ qdr_delivery_set_context(hreq->request_dlv, 0);
+ qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request freed");
+ }
+
+ qdr_http1_out_data_fifo_cleanup(&hreq->out_data);
+
+ _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+ while (rmsg) {
+ _server_response_msg_free(hreq, rmsg);
+ rmsg = DEQ_HEAD(hreq->responses);
+ }
+
+ free__server_request_t(hreq);
+ }
+}
+
+
+static void _write_pending_request(_server_request_t *hreq)
+{
+ if (hreq && !hreq->cancelled && !hreq->base.hconn->close_connection) {
+ assert(DEQ_PREV(&hreq->base) == 0); // preserve order!
+ uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &hreq->out_data);
+ hreq->base.out_http1_octets += written;
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%i] %"PRIu64" octets written",
+ hreq->base.hconn->conn_id, written);
+ }
+}
+
+
+static void _server_connection_free(qdr_http1_connection_t *hconn)
+{
+ for (_server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+ hreq;
+ hreq = (_server_request_t*) DEQ_HEAD(hconn->requests)) {
+ _server_request_free(hreq);
+ }
+ qdr_http1_connection_free(hconn);
+}
+
+
+static void _cancel_request(_server_request_t *hreq)
+{
+ if (!hreq->base.lib_rs) {
+ // never even got to encoding it - manually mark it cancelled
+ hreq->cancelled = true;
+ } else {
+ // cleanup codec state - this will call _server_request_complete_cb()
+ // with cancelled = true
+ h1_codec_request_state_cancel(hreq->base.lib_rs);
+ }
+
+ // cleanup occurs at the end of the connection event handler
+}
diff --git a/src/adaptors/http_common.c b/src/adaptors/http_common.c
index c650875..6eccdad 100644
--- a/src/adaptors/http_common.c
+++ b/src/adaptors/http_common.c
@@ -43,10 +43,10 @@ static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_http_bridge_config_t
config->address = qd_entity_get_string(entity, "address"); CHECK();
version_str = qd_entity_get_string(entity, "protocolVersion"); CHECK();
- if (strncmp(version_str, "HTTP/1", 6) == 0) {
- config->version = VERSION_HTTP1;
- } else {
+ if (strcmp(version_str, "HTTP2") == 0) {
config->version = VERSION_HTTP2;
+ } else {
+ config->version = VERSION_HTTP1;
}
free(version_str);
version_str = 0;
diff --git a/src/adaptors/http_common.h b/src/adaptors/http_common.h
index e8a7ff9..2339146 100644
--- a/src/adaptors/http_common.h
+++ b/src/adaptors/http_common.h
@@ -66,7 +66,8 @@ struct qd_http_connector_t {
qd_server_t *server;
qd_timer_t *timer;
long delay;
- struct qdr_http_connection_t *dispatcher; // pseudo egress connection
+ struct qdr_http_connection_t *dispatcher;
+
DEQ_LINKS(qd_http_connector_t);
};
DEQ_DECLARE(qd_http_connector_t, qd_http_connector_list_t);
diff --git a/src/message.c b/src/message.c
index 286f523..c5d8725 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2444,7 +2444,7 @@ int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffe
//
int idx = 0;
while (idx < count && !!buffer) {
- buffers[idx].context = 0;
+ buffers[idx].context = 0; // reserved for use by caller - do not modify!
buffers[idx].bytes = (char*) qd_buffer_base(buffer) + (buffer == body_data->payload.buffer ? body_data->payload.offset : 0);
buffers[idx].capacity = BUFFER_SIZE;
buffers[idx].size = qd_buffer_size(buffer) - (buffer == body_data->payload.buffer ? body_data->payload.offset : 0);
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 828e464..c10ccb4 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -150,6 +150,7 @@ foreach(py_test_module
system_tests_routing_protocol
system_tests_open_properties
system_tests_http2
+ system_tests_http1_adaptor
)
add_test(${py_test_module} ${TEST_WRAP} ${PYTHON_TEST_COMMAND} -v ${py_test_module})
diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py
new file mode 100644
index 0000000..0e6eb81
--- /dev/null
+++ b/tests/system_tests_http1_adaptor.py
@@ -0,0 +1,722 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Test the HTTP/1.x Adaptor
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+import sys
+from threading import Thread
+try:
+ from http.server import HTTPServer, BaseHTTPRequestHandler
+ from http.client import HTTPConnection
+ from http.client import HTTPException
+except ImportError:
+ from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
+ from httplib import HTTPConnection, HTTPException
+
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from system_test import TestCase, unittest, main_module, Qdrouterd
+from system_test import TIMEOUT, Logger
+
+
+class RequestMsg(object):
+ """
+ A 'hardcoded' HTTP request message. This class writes its request
+ message to the HTTPConnection.
+ """
+ def __init__(self, method, target, headers=None, body=None):
+ self.method = method
+ self.target = target
+ self.headers = headers or {}
+ self.body = body
+
+ def send_request(self, conn):
+ conn.putrequest(self.method, self.target)
+ for key, value in self.headers.items():
+ conn.putheader(key, value)
+ conn.endheaders()
+ if self.body:
+ conn.send(self.body)
+
+
+class ResponseMsg(object):
+ """
+ A 'hardcoded' HTTP response message. This class writes its response
+ message when called by the HTTPServer via the BaseHTTPRequestHandler
+ """
+ def __init__(self, status, version=None, reason=None,
+ headers=None, body=None, eom_close=False, error=False):
+ self.status = status
+ self.version = version or "HTTP/1.1"
+ self.reason = reason
+ self.headers = headers or []
+ self.body = body
+ self.eom_close = eom_close
+ self.error = error
+
+ def send_response(self, handler):
+ handler.protocol_version = self.version
+ if self.error:
+ handler.send_error(self.status,
+ message=self.reason,
+ explain=self.body)
+ return
+
+ handler.send_response(self.status, self.reason)
+ for key, value in self.headers.items():
+ handler.send_header(key, value)
+ handler.end_headers()
+
+ if self.body:
+ handler.wfile.write(self.body)
+ handler.wfile.flush()
+
+ return self.eom_close
+
+
+class ResponseValidator(object):
+ """
+ Validate a response as received by the HTTP client
+ """
+ def __init__(self, status=200, expect_headers=None, expect_body=None):
+ if expect_headers is None:
+ expect_headers = {}
+ self.status = status
+ self.expect_headers = expect_headers
+ self.expect_body = expect_body
+
+ def check_response(self, rsp):
+ if self.status and rsp.status != self.status:
+ raise Exception("Bad response code, expected %s got %s"
+ % (self.status, rsp.status))
+ for key, value in self.expect_headers.items():
+ if rsp.getheader(key) != value:
+ raise Exception("Missing/bad header (%s), expected %s got %s"
+ % (key, value, rsp.getheader(key)))
+
+ body = rsp.read()
+ if (self.expect_body and self.expect_body != body):
+ raise Exception("Bad response body expected %s got %s"
+ % (self.expect_body, body))
+ return body
+
+
+DEFAULT_TEST_SCENARIOS = {
+
+ #
+ # GET
+ #
+
+ "GET": [
+ (RequestMsg("GET", "/GET/error",
+ headers={"Content-Length": 0}),
+ ResponseMsg(400, reason="Bad breath", error=True),
+ ResponseValidator(status=400)),
+
+ (RequestMsg("GET", "/GET/content_len",
+ headers={"Content-Length": "00"}),
+ ResponseMsg(200, reason="OK",
+ headers={"Content-Length": 1,
+ "Content-Type": "text/plain;charset=utf-8"},
+ body=b'?'),
+ ResponseValidator(expect_headers={'Content-Length': '1'},
+ expect_body=b'?')),
+
+ (RequestMsg("GET", "/GET/content_len_511",
+ headers={"Content-Length": 0}),
+ ResponseMsg(200, reason="OK",
+ headers={"Content-Length": 511,
+ "Content-Type": "text/plain;charset=utf-8"},
+ body=b'X' * 511),
+ ResponseValidator(expect_headers={'Content-Length': '511'},
+ expect_body=b'X' * 511)),
+
+ (RequestMsg("GET", "/GET/content_len_4096",
+ headers={"Content-Length": 0}),
+ ResponseMsg(200, reason="OK",
+ headers={"Content-Length": 4096,
+ "Content-Type": "text/plain;charset=utf-8"},
+ body=b'X' * 4096),
+ ResponseValidator(expect_headers={'Content-Length': '4096'},
+ expect_body=b'X' * 4096)),
+
+ (RequestMsg("GET", "/GET/chunked",
+ headers={"Content-Length": 0}),
+ ResponseMsg(200, reason="OK",
+ headers={"transfer-encoding": "chunked",
+ "Content-Type": "text/plain;charset=utf-8"},
+ # note: the chunk length does not count the trailing CRLF
+ body=b'16\r\n'
+ + b'Mary had a little pug \r\n'
+ + b'1b\r\n'
+ + b'Its name was "Skupper-Jack"\r\n'
+ + b'0\r\n'
+ + b'Optional: Trailer\r\n'
+ + b'Optional: Trailer\r\n'
+ + b'\r\n'),
+ ResponseValidator(expect_headers={'transfer-encoding': 'chunked'},
+ expect_body=b'Mary had a little pug Its name was "Skupper-Jack"')),
+
+ (RequestMsg("GET", "/GET/chunked_large",
+ headers={"Content-Length": 0}),
+ ResponseMsg(200, reason="OK",
+ headers={"transfer-encoding": "chunked",
+ "Content-Type": "text/plain;charset=utf-8"},
+ # note: the chunk length does not count the trailing CRLF
+ body=b'1\r\n'
+ + b'?\r\n'
+ + b'800\r\n'
+ + b'X' * 0x800 + b'\r\n'
+ + b'13\r\n'
+ + b'Y' * 0x13 + b'\r\n'
+ + b'0\r\n'
+ + b'Optional: Trailer\r\n'
+ + b'Optional: Trailer\r\n'
+ + b'\r\n'),
+ ResponseValidator(expect_headers={'transfer-encoding': 'chunked'},
+ expect_body=b'?' + b'X' * 0x800 + b'Y' * 0x13)),
+
+ (RequestMsg("GET", "/GET/info_content_len",
+ headers={"Content-Length": 0}),
+ [ResponseMsg(100, reason="Continue",
+ headers={"Blab": 1, "Blob": "?"}),
+ ResponseMsg(200, reason="OK",
+ headers={"Content-Length": 1,
+ "Content-Type": "text/plain;charset=utf-8"},
+ body=b'?')],
+ ResponseValidator(expect_headers={'Content-Type': "text/plain;charset=utf-8"},
+ expect_body=b'?')),
+
+ (RequestMsg("GET", "/GET/no_length",
+ headers={"Content-Length": "0"}),
+ ResponseMsg(200, reason="OK",
+ headers={"Content-Type": "text/plain;charset=utf-8",
+ # ("connection", "close")
+ },
+ body=b'Hi! ' * 1024 + b'X',
+ eom_close=True),
+ ResponseValidator(expect_body=b'Hi! ' * 1024 + b'X')),
+ ],
+
+ #
+ # HEAD
+ #
+
+ "HEAD": [
+ (RequestMsg("HEAD", "/HEAD/test_01",
+ headers={"Content-Length": "0"}),
+ ResponseMsg(200, headers={"App-Header-1": "Value 01",
+ "Content-Length": "10",
+ "App-Header-2": "Value 02"},
+ body=None),
+ ResponseValidator(expect_headers={"App-Header-1": "Value 01",
+ "Content-Length": "10",
+ "App-Header-2": "Value 02"})
+ ),
+ (RequestMsg("HEAD", "/HEAD/test_02",
+ headers={"Content-Length": "0"}),
+ ResponseMsg(200, headers={"App-Header-1": "Value 01",
+ "Transfer-Encoding": "chunked",
+ "App-Header-2": "Value 02"}),
+ ResponseValidator(expect_headers={"App-Header-1": "Value 01",
+ "Transfer-Encoding": "chunked",
+ "App-Header-2": "Value 02"})),
+
+ (RequestMsg("HEAD", "/HEAD/test_03",
+ headers={"Content-Length": "0"}),
+ ResponseMsg(200, headers={"App-Header-3": "Value 03"}, eom_close=True),
+ ResponseValidator(expect_headers={"App-Header-3": "Value 03"})),
+ ],
+
+ #
+ # POST
+ #
+
+ "POST": [
+ (RequestMsg("POST", "/POST/test_01",
+ headers={"App-Header-1": "Value 01",
+ "Content-Length": "18",
+ "Content-Type": "application/x-www-form-urlencoded"},
+ body=b'one=1&two=2&three=3'),
+ ResponseMsg(200, reason="OK",
+ headers={"Response-Header": "whatever",
+ "Transfer-Encoding": "chunked"},
+ body=b'8\r\n'
+ + b'12345678\r\n'
+ + b'f\r\n'
+ + b'abcdefghijklmno\r\n'
+ + b'000\r\n'
+ + b'\r\n'),
+ ResponseValidator(expect_body=b'12345678abcdefghijklmno')
+ ),
+ (RequestMsg("POST", "/POST/test_02",
+ headers={"App-Header-1": "Value 01",
+ "Transfer-Encoding": "chunked"},
+ body=b'01\r\n'
+ + b'!\r\n'
+ + b'0\r\n\r\n'),
+ ResponseMsg(200, reason="OK",
+ headers={"Response-Header": "whatever",
+ "Content-Length": "9"},
+ body=b'Hi There!',
+ eom_close=True),
+ ResponseValidator(expect_body=b'Hi There!')
+ ),
+ ],
+
+ #
+ # PUT
+ #
+
+ "PUT": [
+ (RequestMsg("PUT", "/PUT/test_01",
+ headers={"Put-Header-1": "Value 01",
+ "Transfer-Encoding": "chunked",
+ "Content-Type": "text/plain;charset=utf-8"},
+ body=b'80\r\n'
+ + b'$' * 0x80 + b'\r\n'
+ + b'0\r\n\r\n'),
+ ResponseMsg(201, reason="Created",
+ headers={"Response-Header": "whatever",
+ "Content-length": "3"},
+ body=b'ABC'),
+ ResponseValidator(status=201, expect_body=b'ABC')
+ ),
+
+ (RequestMsg("PUT", "/PUT/test_02",
+ headers={"Put-Header-1": "Value 01",
+ "Content-length": "0",
+ "Content-Type": "text/plain;charset=utf-8"}),
+ ResponseMsg(201, reason="Created",
+ headers={"Response-Header": "whatever",
+ "Transfer-Encoding": "chunked"},
+ body=b'1\r\n$\r\n0\r\n\r\n',
+ eom_close=True),
+ ResponseValidator(status=201, expect_body=b'$')
+ ),
+ ]
+}
+
+
+class RequestHandler(BaseHTTPRequestHandler):
+ """
+ Dispatches requests received by the HTTPServer based on the method
+ """
+ protocol_version = 'HTTP/1.1'
+ def _execute_request(self, tests):
+ for req, resp, val in tests:
+ if req.target == self.path:
+ self._consume_body()
+ if not isinstance(resp, list):
+ resp = [resp]
+ for r in resp:
+ r.send_response(self)
+ if r.eom_close:
+ self.close_connection = 1
+ self.server.system_test_server_done = True
+ return
+ self.send_error(404, "Not Found")
+
+ def do_GET(self):
+ self._execute_request(self.server.system_tests["GET"])
+
+ def do_HEAD(self):
+ self._execute_request(self.server.system_tests["HEAD"])
+
+ def do_POST(self):
+ if self.path == "/SHUTDOWN":
+ self.close_connection = True
+ self.server.system_test_server_done = True
+ self.send_response(200, "OK")
+ self.send_header("Content-Length", "13")
+ self.end_headers()
+ self.wfile.write(b'Server Closed')
+ self.wfile.flush()
+ return
+ self._execute_request(self.server.system_tests["POST"])
+
+ def do_PUT(self):
+ self._execute_request(self.server.system_tests["PUT"])
+
+ # these overrides just quiet the test output
+ # comment them out to help debug:
+ def log_request(self, code=None, size=None):
+ pass
+
+ def log_error(self, format=None, *args):
+ pass
+
+ def log_message(self, format=None, *args):
+ pass
+
+ def _consume_body(self):
+ """
+ Read the entire body off the rfile. This must be done to allow
+ multiple requests on the same socket
+ """
+ if self.command == 'HEAD':
+ return b''
+
+ for key, value in self.headers.items():
+ if key.lower() == 'content-length':
+ return self.rfile.read(int(value))
+
+ if key.lower() == 'transfer-encoding' \
+ and 'chunked' in value.lower():
+ body = b''
+ while True:
+ header = self.rfile.readline().strip().split(b';')[0]
+ data = self.rfile.readline().rstrip()
+ body += data
+ if int(header) == 0:
+ break;
+ return body
+ return self.rfile.read()
+
+
+class MyHTTPServer(HTTPServer):
+ """
+ Adds a switch to the HTTPServer to allow it to exit gracefully
+ """
+ def __init__(self, addr, handler_cls, testcases=None):
+ if testcases is None:
+ testcases = DEFAULT_TEST_SCENARIOS
+ self.system_test_server_done = False
+ self.system_tests = testcases
+ super(MyHTTPServer, self).__init__(addr, handler_cls)
+
+
+class TestServer(object):
+ """
+ A HTTPServer running in a separate thread
+ """
+ def __init__(self, port=8080, tests=None):
+ self._logger = Logger(title="TestServer", print_to_console=False)
+ self._server_addr = ("", port)
+ self._server = MyHTTPServer(self._server_addr, RequestHandler, tests)
+ self._server.allow_reuse_address = True
+ self._thread = Thread(target=self._run)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def _run(self):
+ self._logger.log("TestServer listening on %s:%s" % self._server_addr)
+ try:
+ while not self._server.system_test_server_done:
+ self._server.handle_request()
+ except Exception as exc:
+ self._logger.log("TestServer %s:%s crash: %s" %
+ (self._server_addr, exc))
+ raise
+ self._logger.log("TestServer %s:%s closed" % self._server_addr)
+
+ def wait(self, timeout=TIMEOUT):
+ self._logger.log("TestServer %s:%s shutting down" % self._server_addr)
+ self._thread.join(timeout=TIMEOUT)
+ if self._server:
+ self._server.server_close()
+
+
+class ThreadedTestClient(object):
+ """
+ An HTTP client running in a separate thread
+ """
+ def __init__(self, tests, port, repeat=1):
+ self._conn_addr = ("127.0.0.1:%s" % port)
+ self._tests = tests
+ self._repeat = repeat
+ self._logger = Logger(title="TestClient", print_to_console=False)
+ self._thread = Thread(target=self._run)
+ self._thread.daemon = True
+ self.error = None
+ self._thread.start()
+
+ def _run(self):
+ self._logger.log("TestClient connecting on %s" % self._conn_addr)
+ client = HTTPConnection(self._conn_addr, timeout=TIMEOUT)
+ for loop in range(self._repeat):
+ for op, tests in self._tests.items():
+ for req, _, val in tests:
+ self._logger.log("TestClient sending request")
+ req.send_request(client)
+ self._logger.log("TestClient getting response")
+ rsp = client.getresponse()
+ self._logger.log("TestClient response received")
+ if val:
+ try:
+ body = val.check_response(rsp)
+ except Exception as exc:
+ self._logger.log("TestClient response invalid: %s",
+ str(exc))
+ self.error = "client failed: %s" % str(exc)
+ return
+
+ if req.method is "BODY" and body != b'':
+ self._logger.log("TestClient response invalid: %s",
+ "body present!")
+ self.error = "error: body present!"
+ return
+
+ client.close()
+ self._logger.log("TestClient to %s closed" % self._conn_addr)
+
+ def wait(self, timeout=TIMEOUT):
+ self._thread.join(timeout=TIMEOUT)
+ self._logger.log("TestClient %s shut down" % self._conn_addr)
+
+
+class Http1AdaptorOneRouterTest(TestCase):
+ """
+ Test an HTTP server and client attached to a standalone router
+ """
+ @classmethod
+ def setUpClass(cls):
+ """Start a router"""
+ super(Http1AdaptorOneRouterTest, cls).setUpClass()
+
+ def router(name, mode, extra):
+ config = [
+ ('router', {'mode': mode,
+ 'id': name,
+ 'allowUnsettledMulticast': 'yes'}),
+ ('listener', {'role': 'normal',
+ 'port': cls.tester.get_port()}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+ ]
+
+ if extra:
+ config.extend(extra)
+ config = Qdrouterd.Config(config)
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+ return cls.routers[-1]
+
+ # configuration:
+ # interior
+ #
+ # +----------------+
+ # | INT.A |
+ # +----------------+
+ # ^ ^
+ # | |
+ # V V
+ # <client> <server>
+
+ cls.routers = []
+ cls.http_server_port = cls.tester.get_port()
+ cls.http_listener_port = cls.tester.get_port()
+
+ router('INT.A', 'standalone',
+ [('httpConnector', {'port': cls.http_server_port,
+ 'protocolVersion': 'HTTP1',
+ 'address': 'testServer'}),
+ ('httpListener', {'port': cls.http_listener_port,
+ 'protocolVersion': 'HTTP1',
+ 'address': 'testServer'})
+ ])
+ cls.INT_A = cls.routers[0]
+ cls.INT_A.listener = cls.INT_A.addresses[0]
+
+ def _do_request(self, tests):
+ server = TestServer(port=self.http_server_port)
+ for req, _, val in tests:
+ client = HTTPConnection("127.0.0.1:%s" % self.http_listener_port,
+ timeout=TIMEOUT)
+ req.send_request(client)
+ rsp = client.getresponse()
+ try:
+ body = val.check_response(rsp)
+ except Exception as exc:
+ self.fail("request failed: %s" % str(exc))
+
+ if req.method is "BODY":
+ self.assertEqual(b'', body)
+
+ client.close()
+ server.wait()
+
+ def test_001_get(self):
+ self._do_request(DEFAULT_TEST_SCENARIOS["GET"])
+
+ def test_002_head(self):
+ self._do_request(DEFAULT_TEST_SCENARIOS["HEAD"])
+
+ def test_003_post(self):
+ self._do_request(DEFAULT_TEST_SCENARIOS["POST"])
+
+ def test_004_put(self):
+ self._do_request(DEFAULT_TEST_SCENARIOS["PUT"])
+
+
+class Http1AdaptorInteriorTest(TestCase):
+ """
+ Test an HTTP server connected to an interior router serving multiple HTTP
+ clients
+ """
+ TESTS = {
+ "PUT": [
+ (RequestMsg("PUT", "/PUT/test",
+ headers={"Header-1": "Value",
+ "Header-2": "Value",
+ "Content-Length": "20",
+ "Content-Type": "text/plain;charset=utf-8"},
+ body=b'!' * 20),
+ ResponseMsg(201, reason="Created",
+ headers={"Response-Header": "data",
+ "Content-Length": "0"}),
+ ResponseValidator(status=201)
+ )],
+
+ "POST": [
+ (RequestMsg("POST", "/POST/test",
+ headers={"Header-1": "X",
+ "Content-Length": "11",
+ "Content-Type": "application/x-www-form-urlencoded"},
+ body=b'one=1' + b'&two=2'),
+ ResponseMsg(200, reason="OK",
+ headers={"Response-Header": "whatever",
+ "Content-Length": 10},
+ body=b'0123456789'),
+ ResponseValidator()
+ )],
+
+ "GET": [
+ (RequestMsg("GET", "/GET/test",
+ headers={"Content-Length": "000"}),
+ ResponseMsg(200, reason="OK",
+ headers={"Content-Length": "655",
+ "Content-Type": "text/plain;charset=utf-8"},
+ body=b'?' * 655),
+ ResponseValidator(expect_headers={'Content-Length': '655'},
+ expect_body=b'?' * 655)
+ )],
+
+ "PUT": [
+ (RequestMsg("PUT", "/PUT/chunked",
+ headers={"Transfer-Encoding": "chunked",
+ "Content-Type": "text/plain;charset=utf-8"},
+ body=b'16\r\n' + b'!' * 0x16 + b'\r\n'
+ + b'0\r\n\r\n'),
+ ResponseMsg(204, reason="No Content",
+ headers={"Content-Length": "000"}),
+ ResponseValidator(status=204)
+ )],
+ }
+
+ @classmethod
+ def setUpClass(cls):
+ """Start a router"""
+ super(Http1AdaptorInteriorTest, cls).setUpClass()
+
+ def router(name, mode, extra):
+ config = [
+ ('router', {'mode': mode,
+ 'id': name,
+ 'allowUnsettledMulticast': 'yes'}),
+ ('listener', {'role': 'normal',
+ 'port': cls.tester.get_port()}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+ ]
+
+ if extra:
+ config.extend(extra)
+ config = Qdrouterd.Config(config)
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+ return cls.routers[-1]
+
+ # configuration:
+ # one edge, one interior
+ #
+ # +-------+ +---------+
+ # | EA1 |<==>| INT.A |
+ # +-------+ +---------+
+ # ^ ^
+ # | |
+ # V V
+ # <clients> <server>
+
+ cls.routers = []
+ cls.INTA_edge_port = cls.tester.get_port()
+ cls.http_server_port = cls.tester.get_port()
+ cls.http_listener_port = cls.tester.get_port()
+
+ router('INT.A', 'interior',
+ [('listener', {'role': 'edge', 'port': cls.INTA_edge_port}),
+ ('httpConnector', {'port': cls.http_server_port,
+ 'protocolVersion': 'HTTP1',
+ 'address': 'testServer'})
+ ])
+ cls.INT_A = cls.routers[0]
+ cls.INT_A.listener = cls.INT_A.addresses[0]
+
+ router('EA1', 'edge',
+ [('connector', {'name': 'uplink', 'role': 'edge',
+ 'port': cls.INTA_edge_port}),
+ ('httpListener', {'port': cls.http_listener_port,
+ 'protocolVersion': 'HTTP1',
+ 'address': 'testServer'})
+ ])
+ cls.EA1 = cls.routers[1]
+ cls.EA1.listener = cls.EA1.addresses[0]
+
+ cls.EA1.wait_connectors()
+ cls.INT_A.wait_address('EA1')
+
+
+ def test_01_load(self):
+ """
+ Test multiple clients running as fast as possible
+ """
+ server = TestServer(port=self.http_server_port, tests=self.TESTS)
+
+ clients = []
+ for _ in range(5):
+ clients.append(ThreadedTestClient(self.TESTS,
+ self.http_listener_port,
+ repeat=2))
+ for client in clients:
+ client.wait()
+ self.assertIsNone(client.error)
+
+ # terminate the server thread by sending a request
+ # with eom_close set
+
+ client = ThreadedTestClient({"POST": [(RequestMsg("POST",
+ "/SHUTDOWN",
+ {"Content-Length": "0"}),
+ None,
+ None)]},
+ self.http_listener_port)
+ client.wait()
+ self.assertIsNone(client.error)
+
+ server.wait()
+
+
+if __name__ == '__main__':
+ unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org