You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/08/27 17:05:13 UTC
[qpid-dispatch] branch dev-protocol-adaptors updated:
DISPATCH-1743: Moved the adaptor into http2 folder. Removed unused code and
comments. Freed streams and connections.
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
new c90ff4b DISPATCH-1743: Moved the adaptor into http2 folder. Removed unused code and comments. Freed streams and connections.
c90ff4b is described below
commit c90ff4bdc59c51108b163921f313e6fff8aee01d
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Wed Aug 26 12:54:11 2020 -0400
DISPATCH-1743: Moved the adaptor into http2 folder. Removed unused code and comments. Freed streams and connections.
---
python/qpid_dispatch/management/qdrouter.json | 12 +-
src/CMakeLists.txt | 2 +-
.../{http_adaptor.c => http2/http2_adaptor.c} | 620 +++++++++++----------
.../{http_adaptor.h => http2/http2_adaptor.h} | 13 +-
4 files changed, 348 insertions(+), 299 deletions(-)
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 4770cfe..c9deff4 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1112,10 +1112,10 @@
"protocolVersion": {
"description": "The version of the HTTP protocol supported by this listener.",
"type": [
- "HTTP/1.x",
- "HTTP/2.0"
+ "HTTP1",
+ "HTTP2"
],
- "default": "HTTP/1.x",
+ "default": "HTTP1",
"required": false,
"create": true
}
@@ -1146,10 +1146,10 @@
"protocolVersion": {
"description": "The version of the HTTP protocol supported by this connector.",
"type": [
- "HTTP/1.x",
- "HTTP/2.0"
+ "HTTP1",
+ "HTTP2"
],
- "default": "HTTP/1.x",
+ "default": "HTTP1",
"required": false,
"create": true
}
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index b780011..92b0c5f 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -40,7 +40,7 @@ add_custom_command (
set(qpid_dispatch_SOURCES
adaptors/reference_adaptor.c
adaptors/http_common.c
- adaptors/http_adaptor.c
+ adaptors/http2/http2_adaptor.c
adaptors/http1/http1_lib.c
adaptors/http1/http1_adaptor.c
adaptors/tcp_adaptor.c
diff --git a/src/adaptors/http_adaptor.c b/src/adaptors/http2/http2_adaptor.c
similarity index 86%
rename from src/adaptors/http_adaptor.c
rename to src/adaptors/http2/http2_adaptor.c
index 2a66518..cbd1076 100644
--- a/src/adaptors/http_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -16,6 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
+#include "http2_adaptor.h"
+
#include <stdio.h>
#include <inttypes.h>
@@ -28,10 +30,9 @@
#include <qpid/dispatch/buffer.h>
-#include "qpid/dispatch/protocol_adaptor.h"
-#include "delivery.h"
-#include "http_common.h"
-#include "http_adaptor.h"
+#include <qpid/dispatch/protocol_adaptor.h>
+#include "adaptors/http_common.h"
+
const char *PATH = ":path";
const char *METHOD = ":method";
@@ -139,8 +140,9 @@ qd_composed_field_t *qd_message_compose_amqp(qd_message_t *msg,
return field;
}
-void free_http2_stream_data(qdr_http2_stream_data_t *stream_data)
+static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data)
{
+ qdr_http2_session_data_t *session_data = stream_data->session_data;
stream_data->session_data = 0;
if (stream_data->in_link)
qdr_link_detach(stream_data->in_link, QD_CLOSED, 0);
@@ -149,6 +151,8 @@ void free_http2_stream_data(qdr_http2_stream_data_t *stream_data)
free(stream_data->reply_to);
qd_compose_free(stream_data->app_properties);
qd_compose_free(stream_data->body);
+ DEQ_REMOVE(session_data->streams, stream_data);
+ nghttp2_session_set_stream_user_data(session_data->session, stream_data->stream_id, NULL);
free_qdr_http2_stream_data_t(stream_data);
}
@@ -158,9 +162,7 @@ void free_http2_stream(qdr_http2_session_data_t *session_data, int32_t stream_id
return;
qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
- DEQ_REMOVE(session_data->streams, stream_data);
free_http2_stream_data(stream_data);
- nghttp2_session_set_stream_user_data(session_data->session, stream_id, NULL);
}
static char *get_address_string(pn_raw_connection_t *pn_raw_conn)
@@ -175,14 +177,15 @@ static char *get_address_string(pn_raw_connection_t *pn_raw_conn)
}
}
-void free_qdr_http_connection(qdr_http_connection_t* http_conn)
+void free_qdr_http_connection(qdr_http2_connection_t* http_conn)
{
if(http_conn->remote_address) {
free(http_conn->remote_address);
http_conn->remote_address = 0;
}
if (http_conn->activate_timer) {
- //qd_timer_free(http_conn->activate_timer);
+ qd_timer_free(http_conn->activate_timer);
+ http_conn->activate_timer = 0;
}
qdr_http2_stream_data_t *stream_data = 0;
@@ -228,10 +231,13 @@ static int on_data_chunk_recv_callback(nghttp2_session *session,
const uint8_t *data,
size_t len, void *user_data)
{
- qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+ qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
qdr_http2_session_data_t *session_data = conn->session_data;
qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
+ if (!stream_data)
+ return 0;
+
qd_buffer_list_t buffers;
DEQ_INIT(buffers);
qd_buffer_list_append(&buffers, (uint8_t *)data, len);
@@ -261,10 +267,10 @@ static int on_stream_close_callback(nghttp2_session *session,
nghttp2_error_code error_code,
void *user_data)
{
- qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+ qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
qdr_http2_session_data_t *session_data = conn->session_data;
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 on_stream_close_callback, freeing stream", conn->conn_id, stream_id);
- free_http2_stream(session_data, stream_id);
+ qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
+ stream_data->steam_closed = true;
return 0;
}
@@ -275,7 +281,7 @@ static ssize_t send_callback(nghttp2_session *session,
size_t length,
int flags,
void *user_data) {
- qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+ qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
qdr_http2_session_data_t *session_data = conn->session_data;
qd_buffer_list_append(&(session_data->buffs), (uint8_t *)data, length);
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] HTTP2 send_callback data length %zu, DEQ_SIZE(session_data->buffs)=%zu", conn->conn_id, length, DEQ_SIZE(session_data->buffs));
@@ -289,13 +295,14 @@ static int on_begin_headers_callback(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
{
- qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+ qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
qdr_http2_session_data_t *session_data = conn->session_data;
qdr_http2_stream_data_t *stream_data = 0;
+ // For the client applications, frame->hd.type is either NGHTTP2_HEADERS or NGHTTP2_PUSH_PROMISE
+ // TODO - deal with NGHTTP2_PUSH_PROMISE
if (frame->hd.type == NGHTTP2_HEADERS) {
if(frame->headers.cat == NGHTTP2_HCAT_REQUEST && conn->ingress) {
- // This is a brand new request.
int32_t stream_id = frame->hd.stream_id;
qdr_terminus_t *target = qdr_terminus(0);
stream_data = create_http2_stream_data(session_data, stream_id);
@@ -310,7 +317,7 @@ static int on_begin_headers_callback(nghttp2_session *session,
QD_INCOMING,
qdr_terminus(0), //qdr_terminus_t *source,
target, //qdr_terminus_t *target,
- "tcp.ingress.in", //const char *name,
+ "http.ingress.in", //const char *name,
0, //const char *terminus_addr,
false,
NULL,
@@ -322,7 +329,7 @@ static int on_begin_headers_callback(nghttp2_session *session,
//
//
- // TODO - Why not create something like a UUID and prefix it with the router id ?
+ // TODO - Andrew was asking if the router could expose the function that generates a dynamic address.
//
qdr_terminus_t *dynamic_source = qdr_terminus(0);
qdr_terminus_set_dynamic(dynamic_source);
@@ -340,6 +347,7 @@ static int on_begin_headers_callback(nghttp2_session *session,
else if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
int32_t stream_id = frame->hd.stream_id;
stream_data = (qdr_http2_stream_data_t *)nghttp2_session_get_stream_user_data(session_data->session, stream_id);
+ // TODO - This is not right.
stream_data->message = qd_message();
}
}
@@ -366,7 +374,7 @@ static int on_header_callback(nghttp2_session *session,
void *user_data)
{
int32_t stream_id = frame->hd.stream_id;
- qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+ qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
qdr_http2_session_data_t *session_data = conn->session_data;
qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
@@ -385,7 +393,7 @@ static int on_header_callback(nghttp2_session *session,
}
-static void compose_and_deliver(qdr_http2_stream_data_t *stream_data, qd_composed_field_t *header_and_prop, qdr_http_connection_t *conn, bool receive_complete)
+static void compose_and_deliver(qdr_http2_stream_data_t *stream_data, qd_composed_field_t *header_and_prop, qdr_http2_connection_t *conn, bool receive_complete)
{
if (receive_complete) {
if (!stream_data->body) {
@@ -424,46 +432,52 @@ static void compose_and_deliver(qdr_http2_stream_data_t *stream_data, qd_compose
static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_complete)
{
qd_composed_field_t *header_and_prop = 0;
- qdr_http_connection_t *conn = stream_data->session_data->conn;
+ qdr_http2_connection_t *conn = stream_data->session_data->conn;
bool delivery_routed = false;
if (conn->ingress) {
if (stream_data->reply_to && stream_data->entire_header_arrived && !stream_data->in_dlv) {
- delivery_routed = true;
header_and_prop = qd_message_compose_amqp(stream_data->message,
- conn->config->address,
- 0,
- stream_data->reply_to,
- 0, 0,
- stream_data->stream_id);
+ conn->config->address, // const char *to
+ 0, // const char *subject
+ stream_data->reply_to, // const char *reply_to
+ 0, // const char *content_type
+ 0, // const char *content_encoding
+ 0); // int32_t correlation_id
compose_and_deliver(stream_data, header_and_prop, conn, receive_complete);
+ delivery_routed = true;
}
}
else {
if (stream_data->entire_header_arrived) {
- delivery_routed = true;
header_and_prop = qd_message_compose_amqp(stream_data->message,
- stream_data->reply_to, 0, 0, 0, 0,
- stream_data->stream_id);
+ stream_data->reply_to, // const char *to
+ 0, // const char *subject
+ 0, // const char *reply_to
+ 0, // const char *content_type
+ 0, // const char *content_encoding
+ 0); // int32_t correlation_id
compose_and_deliver(stream_data, header_and_prop, conn, receive_complete);
+ delivery_routed = true;
}
}
return delivery_routed;
}
-static void write_buffers(qdr_http_connection_t *conn)
+static void write_buffers(qdr_http2_connection_t *conn)
{
qdr_http2_session_data_t *session_data = conn->session_data;
size_t pn_buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
- size_t num_buffs = DEQ_SIZE(session_data->buffs) > pn_buffs_to_write ? pn_buffs_to_write : DEQ_SIZE(session_data->buffs);
+ size_t qd_buffs_to_write = DEQ_SIZE(session_data->buffs);
+ size_t num_buffs = qd_buffs_to_write > pn_buffs_to_write ? pn_buffs_to_write : qd_buffs_to_write;
- //
- // No buffers to write, no need to proceed.
- //
if (num_buffs == 0) {
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Zero buffers written in write_buffers - pn_raw_connection_write_buffers_capacity = %zu, DEQ_SIZE(session_data->buffs) = %zu - returning", conn->conn_id, pn_buffs_to_write, DEQ_SIZE(session_data->buffs));
+ //
+ // No buffers to write, cannot proceed.
+ //
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Written 0 buffers in write_buffers() - pn_raw_connection_write_buffers_capacity = %zu, DEQ_SIZE(session_data->buffs) = %zu - returning", conn->conn_id, pn_buffs_to_write, DEQ_SIZE(session_data->buffs));
return;
}
@@ -488,9 +502,9 @@ static void write_buffers(qdr_http_connection_t *conn)
if (i >0) {
size_t num_buffers_written = pn_raw_connection_write_buffers(session_data->conn->pn_raw_conn, raw_buffers, num_buffs);
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Written %i buffer(s) and %i bytes in pn_raw_connection_write_buffers", conn->conn_id, num_buffers_written, total_bytes);
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Written %i buffer(s) and %i bytes in write_buffers() using pn_raw_connection_write_buffers()", conn->conn_id, num_buffers_written, total_bytes);
if (num_buffs != num_buffers_written) {
- assert(false);
+ //TODO - This is not good.
}
}
}
@@ -506,7 +520,7 @@ static void write_buffers(qdr_http_connection_t *conn)
//}
-static void send_settings_frame(qdr_http_connection_t *conn)
+static void send_settings_frame(qdr_http2_connection_t *conn)
{
qdr_http2_session_data_t *session_data = conn->session_data;
nghttp2_settings_entry iv[3] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100},
@@ -516,7 +530,7 @@ static void send_settings_frame(qdr_http_connection_t *conn)
// You must call nghttp2_session_send after calling nghttp2_submit_settings
int rv = nghttp2_submit_settings(session_data->session, NGHTTP2_FLAG_NONE, iv, ARRLEN(iv));
if (rv != 0) {
- qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Fatal error sending settings frame, rv=%i", conn->conn_id, rv);
+ qd_log(http_adaptor->log_source, QD_LOG_ERROR, "[C%i] Fatal error sending settings frame, rv=%i", conn->conn_id, rv);
return;
}
qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Initial SETTINGS frame sent", conn->conn_id);
@@ -528,12 +542,15 @@ static void send_settings_frame(qdr_http_connection_t *conn)
static int on_frame_recv_callback(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
- qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+ qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
qdr_http2_session_data_t *session_data = conn->session_data;
int32_t stream_id = frame->hd.stream_id;
qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
+ if (!stream_data)
+ return 0;
+
switch (frame->hd.type) {
case NGHTTP2_SETTINGS: {
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 SETTINGS frame received", conn->conn_id, stream_id);
@@ -601,171 +618,6 @@ static int on_frame_recv_callback(nghttp2_session *session,
return 0;
}
-qdr_http_connection_t *qdr_http_connection_ingress(qd_http_lsnr_t* listener)
-{
- qdr_http_connection_t* ingress_http_conn = NEW(qdr_http_connection_t);
- ingress_http_conn->ingress = true;
- ingress_http_conn->context.context = ingress_http_conn;
- ingress_http_conn->context.handler = &handle_connection_event;
- ingress_http_conn->config = &(listener->config);
- ingress_http_conn->server = listener->server;
- ingress_http_conn->pn_raw_conn = pn_raw_connection();
-
- ingress_http_conn->session_data = new_qdr_http2_session_data_t();
- ZERO(ingress_http_conn->session_data);
- DEQ_INIT(ingress_http_conn->session_data->buffs);
- DEQ_INIT(ingress_http_conn->session_data->streams);
- ingress_http_conn->session_data->conn = ingress_http_conn;
-
- nghttp2_session_server_new(&(ingress_http_conn->session_data->session), (nghttp2_session_callbacks*)http_adaptor->callbacks, ingress_http_conn);
-
- pn_raw_connection_set_context(ingress_http_conn->pn_raw_conn, ingress_http_conn);
- pn_listener_raw_accept(listener->pn_listener, ingress_http_conn->pn_raw_conn);
- ingress_http_conn->connection_established = true;
- send_settings_frame(ingress_http_conn);
- return ingress_http_conn;
-}
-
-static void grant_read_buffers(qdr_http_connection_t *conn)
-{
- pn_raw_buffer_t raw_buffers[READ_BUFFERS];
- // Give proactor more read buffers for the pn_raw_conn
- if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) {
- size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn);
- while (desired) {
- size_t i;
- for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
- qd_buffer_t *buf = qd_buffer();
- raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
- raw_buffers[i].capacity = qd_buffer_capacity(buf);
- raw_buffers[i].size = 0;
- raw_buffers[i].offset = 0;
- raw_buffers[i].context = (uintptr_t) buf;
- }
- desired -= i;
- pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i);
- }
- }
-}
-
-
-static void qdr_http_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
-{
-}
-
-
-static void qdr_http_flow(void *context, qdr_link_t *link, int credit)
-{
-}
-
-
-static void qdr_http_offer(void *context, qdr_link_t *link, int delivery_count)
-{
-}
-
-
-static void qdr_http_drained(void *context, qdr_link_t *link)
-{
-}
-
-
-static void qdr_http_drain(void *context, qdr_link_t *link, bool mode)
-{
-}
-
-static int qdr_http_get_credit(void *context, qdr_link_t *link)
-{
- return 10;
-}
-
-
-static void qdr_http_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
-{
-}
-
-
-static void qdr_http_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
-{
-}
-
-
-static void qdr_http_conn_trace(void *context, qdr_connection_t *conn, bool trace)
-{
-}
-
-
-static void qdr_http_first_attach(void *context, qdr_connection_t *conn, qdr_link_t *link,
- qdr_terminus_t *source, qdr_terminus_t *target,
- qd_session_class_t session_class)
-{
-}
-
-
-static void qdr_copy_reply_to(qdr_http2_stream_data_t* stream_data, qd_iterator_t* reply_to)
-{
- int length = qd_iterator_length(reply_to);
- stream_data->reply_to = malloc(length + 1);
- qd_iterator_strncpy(reply_to, stream_data->reply_to, length + 1);
-}
-
-
-static void qdr_http_second_attach(void *context, qdr_link_t *link,
- qdr_terminus_t *source, qdr_terminus_t *target)
-{
- qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t*)qdr_link_get_context(link);
- if (stream_data) {
- if (qdr_link_direction(link) == QD_OUTGOING && source->dynamic) {
- if (stream_data->session_data->conn->ingress) {
- qdr_copy_reply_to(stream_data, qdr_terminus_get_address(source));
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to is available now, trying to route delivery", stream_data->session_data->conn->conn_id);
- if (route_delivery(stream_data, qd_message_receive_complete(stream_data->message))) {
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to available now, delivery routed successfully", stream_data->session_data->conn->conn_id);
- }
- else {
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to available now, delivery not routed", stream_data->session_data->conn->conn_id);
- }
- grant_read_buffers(stream_data->session_data->conn);
- }
- qdr_link_flow(http_adaptor->core, link, 10, false);
- }
- }
-}
-
-static void qdr_http_activate(void *notused, qdr_connection_t *c)
-{
- qdr_http_connection_t* conn = (qdr_http_connection_t*) qdr_connection_get_context(c);
- if (conn) {
- if (conn->pn_raw_conn) {
- qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Activation triggered, calling pn_raw_connection_wake()", conn->conn_id);
- pn_raw_connection_wake(conn->pn_raw_conn);
- } else if (conn->activate_timer) {
- // On egress, the raw connection is only created once the
- // first part of the message encapsulating the
- // client->server half of the stream has been
- // received. Prior to that however a subscribing link (and
- // its associated connection must be setup), for which we
- // fake wakeup by using a timer.
- qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Activation triggered, no socket yet so scheduling timer", conn->conn_id);
- qd_timer_schedule(conn->activate_timer, 0);
- } else {
- qd_log(http_adaptor->log_source, QD_LOG_ERROR, "[C%i] Cannot activate", conn->conn_id);
- }
- }
-}
-
-static int qdr_http_push(void *context, qdr_link_t *link, int limit)
-{
- return qdr_link_process_deliveries(http_adaptor->core, link, limit);
-}
-
-
-static void http_connector_establish(qdr_http_connection_t *conn)
-{
- qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Connecting to: %s", conn->conn_id, conn->config->host_port);
- conn->pn_raw_conn = pn_raw_connection();
- pn_raw_connection_set_context(conn->pn_raw_conn, conn);
- pn_proactor_raw_connect(qd_server_proactor(conn->server), conn->pn_raw_conn, conn->config->host_port);
-}
ssize_t read_callback(nghttp2_session *session,
int32_t stream_id, uint8_t *buf,
@@ -773,13 +625,13 @@ ssize_t read_callback(nghttp2_session *session,
nghttp2_data_source *source,
void *user_data)
{
- qdr_link_t *link = source->ptr;
- qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+ qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
qdr_http2_session_data_t *session_data = conn->session_data;
qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
qd_message_depth_status_t status = qd_message_check_depth(stream_data->message, QD_DEPTH_BODY);
+ //TODO - Discuss with team
write_buffers(session_data->conn);
switch (status) {
@@ -830,9 +682,6 @@ ssize_t read_callback(nghttp2_session *session,
if (stream_data->body_data_buff_count == 0 || pn_buffs_to_write==0) {
// We cannot send anything, we need to come back here.
- //TODO - This will not pass code review. Need to investigate.
- link->credit_to_core = 0;
- qdr_link_flow(http_adaptor->core, link, 1, false);
if (stream_data->body_data_buff_count == 0) {
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Exiting read_callback QD_MESSAGE_BODY_DATA_OK, body_data_buff_count=0, temporarily pausing stream", conn->conn_id, stream_data->stream_id);
qd_message_body_data_release(stream_data->curr_body_data);
@@ -859,6 +708,8 @@ ssize_t read_callback(nghttp2_session *session,
qd_message_body_data_buffers(body_data, raw_buffers, buff_offset, 1);
stream_data->curr_body_data_buff_offset += 1;
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback, size of raw_buffer=%zu", conn->conn_id, stream_data->stream_id, raw_buffers[0].size);
+
+ // TODO - There is a bug here. I have to make sure that the raw buffer size is less than the length.
memcpy(buf, raw_buffers[0].bytes, raw_buffers[0].size);
stream_data->body_data_buff_count -= 1;
if (!stream_data->body_data_buff_count) {
@@ -894,7 +745,6 @@ ssize_t read_callback(nghttp2_session *session,
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_NO_MORE - send is complete, setting NGHTTP2_DATA_FLAG_EOF", conn->conn_id, stream_data->stream_id);
}
- qdr_link_flow(http_adaptor->core, link, 1, false);
break;
}
@@ -904,7 +754,6 @@ ssize_t read_callback(nghttp2_session *session,
//
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
qd_message_body_data_release(stream_data->curr_body_data);
- qdr_link_flow(http_adaptor->core, link, 1, false);
stream_data->disposition = PN_REJECTED;
qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_INVALID", conn->conn_id, stream_data->stream_id);
break;
@@ -915,7 +764,6 @@ ssize_t read_callback(nghttp2_session *session,
//
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
qd_message_body_data_release(stream_data->curr_body_data);
- qdr_link_flow(http_adaptor->core, link, 1, false);
stream_data->disposition = PN_REJECTED;
qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_NOT_DATA", conn->conn_id, stream_data->stream_id);
break;
@@ -924,7 +772,6 @@ ssize_t read_callback(nghttp2_session *session,
}
case QD_MESSAGE_DEPTH_INVALID:
- qdr_link_flow(http_adaptor->core, link, 1, false);
qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_callback QD_MESSAGE_DEPTH_INVALID", conn->conn_id, stream_data->stream_id);
stream_data->disposition = PN_REJECTED;
break;
@@ -937,10 +784,177 @@ ssize_t read_callback(nghttp2_session *session,
return 0;
}
-uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *link)
+
+
+qdr_http2_connection_t *qdr_http_connection_ingress(qd_http_lsnr_t* listener)
+{
+ qdr_http2_connection_t* ingress_http_conn = NEW(qdr_http2_connection_t);
+ ingress_http_conn->ingress = true;
+ ingress_http_conn->context.context = ingress_http_conn;
+ ingress_http_conn->context.handler = &handle_connection_event;
+ ingress_http_conn->config = &(listener->config);
+ ingress_http_conn->server = listener->server;
+ ingress_http_conn->pn_raw_conn = pn_raw_connection();
+
+ ingress_http_conn->session_data = new_qdr_http2_session_data_t();
+ ZERO(ingress_http_conn->session_data);
+ DEQ_INIT(ingress_http_conn->session_data->buffs);
+ DEQ_INIT(ingress_http_conn->session_data->streams);
+ ingress_http_conn->session_data->conn = ingress_http_conn;
+ ingress_http_conn->data_prd.read_callback = read_callback;
+
+ nghttp2_session_server_new(&(ingress_http_conn->session_data->session), (nghttp2_session_callbacks*)http_adaptor->callbacks, ingress_http_conn);
+ pn_raw_connection_set_context(ingress_http_conn->pn_raw_conn, ingress_http_conn);
+ pn_listener_raw_accept(listener->pn_listener, ingress_http_conn->pn_raw_conn);
+ return ingress_http_conn;
+}
+
+static void grant_read_buffers(qdr_http2_connection_t *conn)
+{
+ pn_raw_buffer_t raw_buffers[READ_BUFFERS];
+ // Give proactor more read buffers for the pn_raw_conn
+ if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) {
+ size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn);
+ while (desired) {
+ size_t i;
+ for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
+ qd_buffer_t *buf = qd_buffer();
+ raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
+ raw_buffers[i].capacity = qd_buffer_capacity(buf);
+ raw_buffers[i].size = 0;
+ raw_buffers[i].offset = 0;
+ raw_buffers[i].context = (uintptr_t) buf;
+ }
+ desired -= i;
+ pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i);
+ }
+ }
+}
+
+
+static void qdr_http_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
+{
+}
+
+
+static void qdr_http_flow(void *context, qdr_link_t *link, int credit)
+{
+}
+
+
+static void qdr_http_offer(void *context, qdr_link_t *link, int delivery_count)
+{
+}
+
+
+static void qdr_http_drained(void *context, qdr_link_t *link)
+{
+}
+
+
+static void qdr_http_drain(void *context, qdr_link_t *link, bool mode)
+{
+}
+
+static int qdr_http_get_credit(void *context, qdr_link_t *link)
+{
+ return 10;
+}
+
+
+static void qdr_http_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
+{
+}
+
+
+static void qdr_http_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
+{
+}
+
+
+static void qdr_http_conn_trace(void *context, qdr_connection_t *conn, bool trace)
+{
+}
+
+
+static void qdr_http_first_attach(void *context, qdr_connection_t *conn, qdr_link_t *link,
+ qdr_terminus_t *source, qdr_terminus_t *target,
+ qd_session_class_t session_class)
+{
+}
+
+
+static void qdr_copy_reply_to(qdr_http2_stream_data_t* stream_data, qd_iterator_t* reply_to)
+{
+ int length = qd_iterator_length(reply_to);
+ stream_data->reply_to = malloc(length + 1);
+ qd_iterator_strncpy(reply_to, stream_data->reply_to, length + 1);
+}
+
+
+static void qdr_http_second_attach(void *context, qdr_link_t *link,
+ qdr_terminus_t *source, qdr_terminus_t *target)
+{
+ qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t*)qdr_link_get_context(link);
+ if (stream_data) {
+ if (qdr_link_direction(link) == QD_OUTGOING && source->dynamic) {
+ if (stream_data->session_data->conn->ingress) {
+ qdr_copy_reply_to(stream_data, qdr_terminus_get_address(source));
+ if (route_delivery(stream_data, qd_message_receive_complete(stream_data->message))) {
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to available now, delivery routed successfully", stream_data->session_data->conn->conn_id);
+ }
+ else {
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to available but delivery not routed", stream_data->session_data->conn->conn_id);
+ }
+ grant_read_buffers(stream_data->session_data->conn);
+ }
+ qdr_link_flow(http_adaptor->core, link, 10, false);
+ }
+ }
+}
+
+static void qdr_http_activate(void *notused, qdr_connection_t *c)
+{
+ qdr_http2_connection_t* conn = (qdr_http2_connection_t*) qdr_connection_get_context(c);
+ if (conn) {
+ if (conn->pn_raw_conn) {
+ qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Activation triggered, calling pn_raw_connection_wake()", conn->conn_id);
+ pn_raw_connection_wake(conn->pn_raw_conn);
+ }
+ else if (conn->activate_timer) {
+ // On egress, the raw connection is only created once the
+ // first part of the message encapsulating the
+ // client->server half of the stream has been
+ // received. Prior to that however a subscribing link (and
+ // its associated connection must be setup), for which we
+ // fake wakeup by using a timer.
+ qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Activation triggered, no socket yet so scheduling timer", conn->conn_id);
+ qd_timer_schedule(conn->activate_timer, 0);
+ } else {
+ qd_log(http_adaptor->log_source, QD_LOG_ERROR, "[C%i] Cannot activate", conn->conn_id);
+ }
+ }
+}
+
+static int qdr_http_push(void *context, qdr_link_t *link, int limit)
+{
+ return qdr_link_process_deliveries(http_adaptor->core, link, limit);
+}
+
+
+static void http_connector_establish(qdr_http2_connection_t *conn)
+{
+ qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Connecting to: %s", conn->conn_id, conn->config->host_port);
+ conn->pn_raw_conn = pn_raw_connection();
+ pn_raw_connection_set_context(conn->pn_raw_conn, conn);
+ pn_proactor_raw_connect(qd_server_proactor(conn->server), conn->pn_raw_conn, conn->config->host_port);
+}
+
+
+uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
{
qdr_http2_session_data_t *session_data = stream_data->session_data;
- qdr_http_connection_t *conn = session_data->conn;
+ qdr_http2_connection_t *conn = session_data->conn;
qd_message_t *message = stream_data->message;
if (stream_data->out_dlv) {
@@ -965,8 +979,6 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *
nghttp2_nv hdrs[count];
- int stream_id = stream_data->session_data->conn->ingress?stream_data->stream_id: -1;
-
for (uint32_t idx = 0; idx < count; idx++) {
qd_parsed_field_t *key = qd_parse_sub_key(app_properties_fld, idx);
qd_parsed_field_t *val = qd_parse_sub_value(app_properties_fld, idx);
@@ -980,13 +992,18 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *
hdrs[idx].flags = NGHTTP2_NV_FLAG_NONE;
}
+ int stream_id = stream_data->session_data->conn->ingress?stream_data->stream_id: -1;
+
// This does not really submit the request. We need to read the bytes
//nghttp2_session_set_next_stream_id(session_data->session, stream_data->stream_id);
stream_data->stream_id = nghttp2_submit_headers(session_data->session,
0,
- stream_id, NULL, hdrs,
+ stream_id,
+ NULL,
+ hdrs,
count,
stream_data);
+
if (stream_id != -1) {
stream_data->stream_id = stream_id;
}
@@ -1012,8 +1029,6 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *
}
else {
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Processing message body", conn->conn_id, stream_data->stream_id);
- conn->data_prd.read_callback = read_callback;
- conn->data_prd.source.ptr = link;
int rv = nghttp2_submit_data(session_data->session, NGHTTP2_FLAG_END_STREAM, stream_data->stream_id, &conn->data_prd);
if (rv != 0) {
qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] Error submitting data rv=%i", conn->conn_id, stream_data->stream_id, rv);
@@ -1036,18 +1051,13 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
{
qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t*)qdr_link_get_context(link);
- if (!stream_data)
- return 0;
+ assert(stream_data);
- qdr_http_connection_t *conn = stream_data->session_data->conn;
+ qdr_http2_connection_t *conn = stream_data->session_data->conn;
if (link == stream_data->session_data->conn->stream_dispatcher) {
qd_message_t *msg = qdr_delivery_message(delivery);
- qd_iterator_t *iter = qd_message_field_iterator_typed(msg, QD_FIELD_CORRELATION_ID);
- qd_parsed_field_t *cid_field = qd_parse(iter);
- uint32_t stream_id = qd_parse_as_int(cid_field);
-
- qdr_http2_stream_data_t *stream_data = create_http2_stream_data(conn->session_data, stream_id);
+ qdr_http2_stream_data_t *stream_data = create_http2_stream_data(conn->session_data, 0);
stream_data->message = qdr_delivery_message(delivery);
stream_data->out_dlv = delivery;
@@ -1057,7 +1067,7 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
QD_OUTGOING,
source, //qdr_terminus_t *source,
qdr_terminus(0), //qdr_terminus_t *target,
- "tcp.egress.out", //const char *name,
+ "http.egress.out", //const char *name,
0, //const char *terminus_addr,
true,
delivery,
@@ -1083,7 +1093,7 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
qdr_link_set_context(stream_data->in_link, stream_data);
//Let's make an outbound connection to the configured connector.
- qdr_http_connection_t *conn = stream_data->session_data->conn;
+ qdr_http2_connection_t *conn = stream_data->session_data->conn;
if (!conn->connection_established) {
if (!conn->ingress) {
http_connector_establish(conn);
@@ -1096,9 +1106,8 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
stream_data->message = qdr_delivery_message(delivery);
stream_data->out_dlv = delivery;
}
- return handle_outgoing_http(stream_data, link);
+ return handle_outgoing_http(stream_data);
}
- qdr_link_flow(http_adaptor->core, link, 1, false);
}
return 0;
}
@@ -1113,7 +1122,7 @@ void qd_http2_delete_connector(qd_dispatch_t *qd, qd_http_connector_t *connector
}
-static int handle_incoming_http(qdr_http_connection_t *conn)
+static int handle_incoming_http(qdr_http2_connection_t *conn)
{
qd_buffer_list_t buffers;
DEQ_INIT(buffers);
@@ -1121,6 +1130,7 @@ static int handle_incoming_http(qdr_http_connection_t *conn)
size_t n;
int count = 0;
+ // TODO - Remove after using connection safe pointers.
if (!conn->pn_raw_conn)
return 0;
@@ -1136,7 +1146,7 @@ static int handle_incoming_http(qdr_http_connection_t *conn)
}
//
- // Read each buffer in the buffer chain and call nghttp2_session_mem_recv with each buffer content
+ // Read each buffer in the buffer chain and call nghttp2_session_mem_recv with buffer content
//
qd_buffer_t *buf = DEQ_HEAD(buffers);
qd_buffer_t *curr_buf = 0;
@@ -1154,7 +1164,7 @@ static int handle_incoming_http(qdr_http_connection_t *conn)
}
-qdr_http_connection_t *qdr_http_connection_ingress_accept(qdr_http_connection_t* ingress_http_conn)
+qdr_http2_connection_t *qdr_http_connection_ingress_accept(qdr_http2_connection_t* ingress_http_conn)
{
ingress_http_conn->remote_address = get_address_string(ingress_http_conn->pn_raw_conn);
qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted,
@@ -1194,28 +1204,54 @@ qdr_http_connection_t *qdr_http_connection_ingress_accept(qdr_http_connection_t*
ingress_http_conn->qdr_conn = conn;
ingress_http_conn->conn_id = conn->identity;
qdr_connection_set_context(conn, ingress_http_conn);
- //grant_read_buffers(ingress_http_conn);
+ ingress_http_conn->connection_established = true;
return ingress_http_conn;
}
+static void restart_streams(qdr_http2_connection_t *http_conn)
+{
+ qdr_http2_stream_data_t *stream_data = DEQ_HEAD(http_conn->session_data->streams);
+ while (stream_data) {
+ if (stream_data->steam_closed) {
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] freeing stream", http_conn->conn_id, stream_data->stream_id);
+ qdr_http2_stream_data_t *next_stream_data = DEQ_NEXT(stream_data);
+ handle_outgoing_http(stream_data);
+ free_http2_stream_data(stream_data);
+ stream_data = next_stream_data;
+ }
+ else {
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%i] Restarting stream in restart_streams()", http_conn->conn_id, stream_data->stream_id);
+ handle_outgoing_http(stream_data);
+ stream_data = DEQ_NEXT(stream_data);
+ }
+ }
+}
+
+
static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void *context)
{
- qdr_http_connection_t *conn = (qdr_http_connection_t*) context;
+ qdr_http2_connection_t *conn = (qdr_http2_connection_t*) context;
qd_log_source_t *log = http_adaptor->log_source;
switch (pn_event_type(e)) {
case PN_RAW_CONNECTION_CONNECTED: {
if (conn->ingress) {
+ send_settings_frame(conn);
qdr_http_connection_ingress_accept(conn);
- qd_log(log, QD_LOG_INFO, "[C%i] Accepted from %s", conn->conn_id, conn->remote_address);
+ qd_log(log, QD_LOG_INFO, "[C%i] Accepted ((PN_RAW_CONNECTION_CONNECTED)) from %s", conn->conn_id, conn->remote_address);
} else {
- qd_log(log, QD_LOG_INFO, "[C%i] Connected", conn->conn_id);
+ qd_log(log, QD_LOG_INFO, "[C%i] Connected (PN_RAW_CONNECTION_CONNECTED)", conn->conn_id);
conn->connection_established = true;
qdr_connection_process(conn->qdr_conn);
}
break;
}
case PN_RAW_CONNECTION_CLOSED_READ: {
+ printf ("PN_RAW_CONNECTION_CLOSED_READ\n");
+ fflush(stdout);
+ qdr_connection_set_context(conn->qdr_conn, 0);
+ qdr_connection_closed(conn->qdr_conn);
+ conn->qdr_conn = 0;
pn_raw_connection_close(conn->pn_raw_conn);
conn->pn_raw_conn = 0;
qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
@@ -1227,7 +1263,8 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
}
case PN_RAW_CONNECTION_DISCONNECTED: {
qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_DISCONNECTED", conn->conn_id);
- qdr_connection_closed(conn->qdr_conn);
+ if (conn->qdr_conn)
+ qdr_connection_closed(conn->qdr_conn);
free_qdr_http_connection(conn);
break;
}
@@ -1246,10 +1283,9 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
break;
}
case PN_RAW_CONNECTION_READ: {
- qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_READ", conn->conn_id);
int read = handle_incoming_http(conn);
- qd_log(log, QD_LOG_TRACE, "[C%i] Read %i bytes", conn->conn_id, read);
- while (qdr_connection_process(conn->qdr_conn)) {}
+ qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_READ Read %i bytes", conn->conn_id, read);
+ //while (qdr_connection_process(conn->qdr_conn)) {}
break;
}
case PN_RAW_CONNECTION_WRITTEN: {
@@ -1261,12 +1297,12 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
written += buffs[i].size;
qd_buffer_t *qd_buff = (qd_buffer_t *) buffs[i].context;
assert(qd_buff);
- if (qd_buff)
+ if (qd_buff != NULL)
qd_buffer_free(qd_buff);
}
}
qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_WRITTEN Wrote %i bytes", conn->conn_id, written);
- while (qdr_connection_process(conn->qdr_conn)) {}
+ restart_streams(conn);
break;
}
default:
@@ -1315,6 +1351,9 @@ static bool http_listener_listen(qd_http_lsnr_t *li) {
qd_http_lsnr_t *qd_http2_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
{
+ // TODO - Add lws in the qd_lws_listener.
+ // TOOD - Separate commit.
+ // TODO - use http2 in function names
qd_http_lsnr_t *li = qd_http_lsnr(qd->server, &handle_listener_event);
if (!li) {
qd_log(http_adaptor->log_source, QD_LOG_ERROR, "Unable to create http listener: no memory");
@@ -1332,13 +1371,13 @@ qd_http_lsnr_t *qd_http2_configure_listener(qd_dispatch_t *qd, const qd_http_bri
void qd_http2_delete_listener(qd_dispatch_t *qd, qd_http_lsnr_t *listener)
{
- // TBD?
+ // TODO - Not implemented yet.
}
static void on_activate(void *context)
{
- qdr_http_connection_t* conn = (qdr_http_connection_t*) context;
+ qdr_http2_connection_t* conn = (qdr_http2_connection_t*) context;
qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] on_activate", conn->conn_id);
while (qdr_connection_process(conn->qdr_conn)) {}
@@ -1346,36 +1385,34 @@ static void on_activate(void *context)
-qdr_http_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector)
+qdr_http2_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector)
{
- qdr_http_connection_t* egress_conn = NEW(qdr_http_connection_t);
- ZERO(egress_conn);
- //FIXME: this is only needed while waiting for raw_connection_wake
- //functionality in proton
- egress_conn->activate_timer = qd_timer(http_adaptor->core->qd, on_activate, egress_conn);
-
- egress_conn->ingress = false;
- egress_conn->context.context = egress_conn;
- egress_conn->context.handler = &handle_connection_event;
- egress_conn->config = &(connector->config);
- egress_conn->server = connector->server;
- egress_conn->data_prd.read_callback = read_callback;
-
- egress_conn->session_data = new_qdr_http2_session_data_t();
- ZERO(egress_conn->session_data);
- DEQ_INIT(egress_conn->session_data->buffs);
- DEQ_INIT(egress_conn->session_data->streams);
- egress_conn->session_data->conn = egress_conn;
-
- nghttp2_session_client_new(&egress_conn->session_data->session, (nghttp2_session_callbacks*)http_adaptor->callbacks, (void *)egress_conn);
-
- //pn_raw_connection_set_context(egress_conn->pn_raw_conn, egress_conn);
+ // TODO - Make this a pooled object.
+ qdr_http2_connection_t* egress_http_conn = NEW(qdr_http2_connection_t);
+ ZERO(egress_http_conn);
+ egress_http_conn->activate_timer = qd_timer(http_adaptor->core->qd, on_activate, egress_http_conn);
+
+ egress_http_conn->ingress = false;
+ egress_http_conn->context.context = egress_http_conn;
+ egress_http_conn->context.handler = &handle_connection_event;
+ egress_http_conn->config = &(connector->config);
+ egress_http_conn->server = connector->server;
+ egress_http_conn->data_prd.read_callback = read_callback;
+
+ egress_http_conn->session_data = new_qdr_http2_session_data_t();
+ ZERO(egress_http_conn->session_data);
+ DEQ_INIT(egress_http_conn->session_data->buffs);
+ DEQ_INIT(egress_http_conn->session_data->streams);
+ egress_http_conn->session_data->conn = egress_http_conn;
+
+ nghttp2_session_client_new(&egress_http_conn->session_data->session, (nghttp2_session_callbacks*)http_adaptor->callbacks, (void *)egress_http_conn);
+
qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted,
false, //bool is_authenticated,
true, //bool opened,
"", //char *sasl_mechanisms,
QD_OUTGOING, //qd_direction_t dir,
- egress_conn->config->host_port, //const char *host,
+ egress_http_conn->config->host_port, //const char *host,
"", //const char *ssl_proto,
"", //const char *ssl_cipher,
"", //const char *user,
@@ -1391,7 +1428,7 @@ qdr_http_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector
true,
QDR_ROLE_NORMAL,
1,
- qd_server_allocate_connection_id(egress_conn->server),
+ qd_server_allocate_connection_id(egress_http_conn->server),
0,
0,
false,
@@ -1403,13 +1440,18 @@ qdr_http_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector
info,
0,
0);
- egress_conn->qdr_conn = conn;
- egress_conn->conn_id = conn->identity;
- qdr_connection_set_context(conn, egress_conn);
+ egress_http_conn->qdr_conn = conn;
+
+ //
+ // Using the same identity as the qdr_connection's identity. This is really helpful
+ // when we try to trace the life cycle of a delivery.
+ //
+ egress_http_conn->conn_id = conn->identity;
+ qdr_connection_set_context(conn, egress_http_conn);
qdr_terminus_t *source = qdr_terminus(0);
- qdr_terminus_set_address(source, egress_conn->config->address);
- egress_conn->stream_dispatcher = qdr_link_first_attach(conn,
+ qdr_terminus_set_address(source, egress_http_conn->config->address);
+ egress_http_conn->stream_dispatcher = qdr_link_first_attach(conn,
QD_OUTGOING,
source, //qdr_terminus_t *source,
qdr_terminus(0), //qdr_terminus_t *target,
@@ -1417,18 +1459,17 @@ qdr_http_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector
0, //const char *terminus_addr,
false,
0,
- &(egress_conn->stream_dispatcher_id));
- // Create a dummy stream_data object and set that as context
+ &(egress_http_conn->stream_dispatcher_id));
+
+ // Create a dummy stream_data object and set that as context.
qdr_http2_stream_data_t *stream_data = new_qdr_http2_stream_data_t();
ZERO(stream_data);
-
stream_data->session_data = new_qdr_http2_session_data_t();
ZERO(stream_data->session_data);
- stream_data->stream_id = 0;
- stream_data->session_data->conn = egress_conn;
+ stream_data->session_data->conn = egress_http_conn;
- qdr_link_set_context(egress_conn->stream_dispatcher, stream_data);
- return egress_conn;
+ qdr_link_set_context(egress_http_conn->stream_dispatcher, stream_data);
+ return egress_http_conn;
}
@@ -1470,8 +1511,8 @@ static void qdr_http_adaptor_init(qdr_core_t *core, void **adaptor_context)
adaptor->core = core;
adaptor->adaptor = qdr_protocol_adaptor(core,
"http", // name
- adaptor, // context
- qdr_http_activate, // activate
+ adaptor, // context
+ qdr_http_activate,
qdr_http_first_attach,
qdr_http_second_attach,
qdr_http_detach,
@@ -1491,6 +1532,9 @@ static void qdr_http_adaptor_init(qdr_core_t *core, void **adaptor_context)
DEQ_INIT(adaptor->listeners);
DEQ_INIT(adaptor->connectors);
+ //
+ // Register all nghttp2 callbacks.
+ //
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback);
diff --git a/src/adaptors/http_adaptor.h b/src/adaptors/http2/http2_adaptor.h
similarity index 90%
rename from src/adaptors/http_adaptor.h
rename to src/adaptors/http2/http2_adaptor.h
index 53a05b1..33fa3f4 100644
--- a/src/adaptors/http_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -24,6 +24,10 @@
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/log.h>
#include <nghttp2/nghttp2.h>
+#include <qpid/dispatch/protocol_adaptor.h>
+
+#include "server_private.h"
+#include "adaptors/http_common.h"
// We already have a qd_http_listener_t defined in http-libwebsockets.c
@@ -32,11 +36,11 @@
// and get rid of http-libwebsockets.c and rename this as qd_http_listener_t
typedef struct qdr_http2_session_data_t qdr_http2_session_data_t;
typedef struct qdr_http2_stream_data_t qdr_http2_stream_data_t;
-typedef struct qdr_http_connection_t qdr_http_connection_t;
+typedef struct qdr_http2_connection_t qdr_http2_connection_t;
DEQ_DECLARE(qdr_http2_stream_data_t, qd_http2_stream_data_list_t);
struct qdr_http2_session_data_t {
- qdr_http_connection_t *conn; // Connection associated with the session_data
+ qdr_http2_connection_t *conn; // Connection associated with the session_data
nghttp2_session *session; // A pointer to the nghttp2s' session object
qd_http2_stream_data_list_t streams; // A session can have many streams.
qd_buffer_list_t buffs; // Buffers for writing
@@ -63,11 +67,12 @@ struct qdr_http2_stream_data_t {
int body_data_buff_count;
int32_t stream_id;
- bool entire_header_arrived; // true if all the headershave arrived, just before the start of the data frame or just before the END_STREAM.
+ bool entire_header_arrived;
bool header_sent;
+ bool steam_closed;
};
-struct qdr_http_connection_t {
+struct qdr_http2_connection_t {
qd_handler_context_t context;
qdr_connection_t *qdr_conn;
pn_raw_connection_t *pn_raw_conn;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org