You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2021/03/17 14:50:37 UTC
[qpid-dispatch] branch master updated: DISPATCH-1979: Added code to
capture the footer of the GRPC message in case it arrives before credit is
available to send the message. Also code to create new nghttp2 session when
a new connection is opened. This closes #1057.
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 3fa6135 DISPATCH-1979: Added code to capture the footer of the GRPC message in case it arrives before credit is available to send the message. Also code to create new nghttp2 session when a new connection is opened. This closes #1057.
3fa6135 is described below
commit 3fa6135dfba7926b966ad3c1baa567002e22a9a6
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Sun Feb 14 19:58:41 2021 -0500
DISPATCH-1979: Added code to capture the footer of the GRPC message in case it arrives before credit is available to send the message. Also code to create new nghttp2 session when a new connection is opened. This closes #1057.
---
include/qpid/dispatch/message.h | 1 +
src/adaptors/http2/http2_adaptor.c | 173 +++++++++++++++++++++++++++++--------
src/adaptors/http2/http2_adaptor.h | 1 -
src/message.c | 17 ++++
4 files changed, 155 insertions(+), 37 deletions(-)
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 1558dde..a580701 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -303,6 +303,7 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b
void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content, bool receive_complete);
void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, bool receive_complete);
void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3, bool receive_complete);
+void qd_message_compose_5(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, qd_composed_field_t *field3, qd_composed_field_t *field4, bool receive_complete);
/**
* qd_message_extend
diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index db7261c..5735b5c 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -69,6 +69,18 @@ static qdr_http2_adaptor_t *http2_adaptor;
static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void *context);
static void _http_record_request(qdr_http2_connection_t *conn, qdr_http2_stream_data_t *stream_data);
+static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool on_shutdown);
+
+static void free_all_connection_streams(qdr_http2_connection_t *http_conn, bool on_shutdown)
+{
+ // Free all the stream data associated with this connection/session.
+ qdr_http2_stream_data_t *stream_data = DEQ_HEAD(http_conn->session_data->streams);
+ while (stream_data) {
+ qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Freeing stream in free_qdr_http2_connection", stream_data->session_data->conn->conn_id, stream_data->stream_id);
+ free_http2_stream_data(stream_data, on_shutdown);
+ stream_data = DEQ_HEAD(http_conn->session_data->streams);
+ }
+}
static void set_stream_data_delivery_flags(qdr_http2_stream_data_t * stream_data, qdr_delivery_t *dlv) {
if (dlv == stream_data->in_dlv) {
@@ -345,12 +357,7 @@ static char *get_address_string(pn_raw_connection_t *pn_raw_conn)
void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdown)
{
// Free all the stream data associated with this connection/session.
- qdr_http2_stream_data_t *stream_data = DEQ_HEAD(http_conn->session_data->streams);
- while (stream_data) {
- qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Freeing stream in free_qdr_http2_connection", stream_data->session_data->conn->conn_id, stream_data->stream_id);
- free_http2_stream_data(stream_data, on_shutdown);
- stream_data = DEQ_HEAD(http_conn->session_data->streams);
- }
+ free_all_connection_streams(http_conn, on_shutdown);
if(http_conn->remote_address) {
free(http_conn->remote_address);
@@ -368,9 +375,11 @@ void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdo
http_conn->context.context = 0;
- nghttp2_session_del(http_conn->session_data->session);
+ if (http_conn->session_data->session)
+ nghttp2_session_del(http_conn->session_data->session);
free_qdr_http2_session_data_t(http_conn->session_data);
+ http_conn->session_data = 0;
sys_mutex_lock(http2_adaptor->lock);
DEQ_REMOVE(http2_adaptor->connections, http_conn);
sys_mutex_unlock(http2_adaptor->lock);
@@ -528,14 +537,13 @@ static int snd_data_callback(nghttp2_session *session,
qdr_http2_session_data_t *session_data = conn->session_data;
qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t *)source->ptr;
- qd_http2_buffer_t *http2_buff = qd_http2_buffer();
- DEQ_INSERT_TAIL(session_data->buffs, http2_buff);
- // Insert the framehd of length 9 bytes into the buffer
- memcpy(qd_http2_buffer_cursor(http2_buff), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH);
- qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH);
-
int bytes_sent = 0; // This should not include the header length of 9.
if (length) {
+ qd_http2_buffer_t *http2_buff = qd_http2_buffer();
+ DEQ_INSERT_TAIL(session_data->buffs, http2_buff);
+ // Insert the framehd of length 9 bytes into the buffer
+ memcpy(qd_http2_buffer_cursor(http2_buff), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH);
+ qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH);
pn_raw_buffer_t pn_raw_buffs[stream_data->qd_buffers_to_send];
qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, 0, stream_data->qd_buffers_to_send);
@@ -574,11 +582,10 @@ static int snd_data_callback(nghttp2_session *session,
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 snd_data_callback finished, length=%zu, bytes_sent=%i, stream_data=%p", conn->conn_id, stream_data->stream_id, length, bytes_sent, (void *)stream_data);
- if (length)
+ if (length) {
assert(bytes_sent == length);
-
- write_buffers(conn);
-
+ write_buffers(conn);
+ }
return 0;
@@ -745,16 +752,37 @@ static bool compose_and_deliver(qdr_http2_connection_t *conn, qdr_http2_stream_d
qd_compose_insert_binary(stream_data->body, 0, 0);
qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Inserting empty body data in compose_and_deliver", conn->conn_id, stream_data->stream_id);
}
- qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete);
+
+ if (stream_data->footer_properties) {
+ qd_message_compose_5(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, stream_data->footer_properties, receive_complete);
+ }
+ else {
+ qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete);
+ }
}
else {
if (stream_data->body) {
qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] receive_complete = false and has stream_data->body in compose_and_deliver", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity);
- qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete);
+ if (stream_data->footer_properties) {
+ qd_message_compose_5(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, stream_data->footer_properties, receive_complete);
+ }
+ else {
+ qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete);
+ }
stream_data->body_data_added = true;
}
else {
- qd_message_compose_3(stream_data->message, header_and_props, stream_data->app_properties, receive_complete);
+
+ if (stream_data->footer_properties) {
+ //
+ // The footer has already arrived but there was no body. Insert an empty body
+ //
+ stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+ qd_message_compose_5(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, stream_data->footer_properties, receive_complete);
+ }
+ else {
+ qd_message_compose_3(stream_data->message, header_and_props, stream_data->app_properties, receive_complete);
+ }
}
}
@@ -990,8 +1018,15 @@ static int on_frame_recv_callback(nghttp2_session *session,
}
if (stream_data->entire_footer_arrived) {
- qdr_delivery_continue(http2_adaptor->core, stream_data->in_dlv, false);
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Entire footer arrived, qdr_delivery_continue "DLV_FMT, conn->conn_id, stream_id, DLV_ARGS(stream_data->in_dlv));
+ if (stream_data->in_dlv) {
+ qdr_delivery_continue(http2_adaptor->core, stream_data->in_dlv, false);
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Entire footer arrived, qdr_delivery_continue "DLV_FMT, conn->conn_id, stream_id, DLV_ARGS(stream_data->in_dlv));
+ }
+ else {
+ if (route_delivery(stream_data, receive_complete)) {
+ qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Entire footer arrived, delivery routed successfully (on_frame_recv_callback)", conn->conn_id, stream_id);
+ }
+ }
}
else {
//
@@ -1351,6 +1386,25 @@ static int qdr_http_get_credit(void *context, qdr_link_t *link)
}
+ssize_t error_read_callback(nghttp2_session *session,
+ int32_t stream_id,
+ uint8_t *buf,
+ size_t length,
+ uint32_t *data_flags,
+ nghttp2_data_source *source,
+ void *user_data)
+{
+ size_t len = 0;
+ char *error_msg = (char *) source->ptr;
+ if (error_msg) {
+ len = strlen(error_msg);
+ if (len > 0)
+ memcpy(buf, error_msg, len);
+ }
+ *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+ return len;
+}
+
static void qdr_http_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
{
qdr_http2_stream_data_t* stream_data = qdr_delivery_get_context(dlv);
@@ -1377,31 +1431,49 @@ static void qdr_http_delivery_update(void *context, qdr_delivery_t *dlv, uint64_
}
if (settled) {
- nghttp2_nv hdrs[1];
+ nghttp2_nv hdrs[2];
if (conn->ingress && (disp == PN_RELEASED || disp == PN_MODIFIED || disp == PN_REJECTED)) {
if (disp == PN_RELEASED || disp == PN_MODIFIED) {
+ uint8_t * error_msg = (uint8_t *)"Service Unavailable";
hdrs[0].name = (uint8_t *)":status";
hdrs[0].value = (uint8_t *)"503";
hdrs[0].namelen = 7;
hdrs[0].valuelen = 3;
hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
- nghttp2_submit_response(stream_data->session_data->session, stream_data->stream_id, hdrs, 1, 0);
+
+ hdrs[1].name = (uint8_t *)":content-type";
+ hdrs[1].value = (uint8_t *)"text/plain";
+ hdrs[1].namelen = 13;
+ hdrs[1].valuelen = 10;
+ hdrs[1].flags = NGHTTP2_NV_FLAG_NONE;
+
+ nghttp2_data_provider data_prd;
+ data_prd.read_callback = error_read_callback;
+ data_prd.source.ptr = error_msg;
+
+ nghttp2_submit_response(stream_data->session_data->session, stream_data->stream_id, hdrs, 2, &data_prd);
+ nghttp2_submit_goaway(stream_data->session_data->session, 0, stream_data->stream_id, NGHTTP2_CONNECT_ERROR, error_msg, 19);
}
else if (disp == PN_REJECTED) {
+ uint8_t * error_msg = (uint8_t *)"Resource Unavailable";
hdrs[0].name = (uint8_t *)":status";
hdrs[0].value = (uint8_t *)"400";
hdrs[0].namelen = 7;
hdrs[0].valuelen = 3;
hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
- nghttp2_submit_response(stream_data->session_data->session, stream_data->stream_id, hdrs, 1, 0);
- }
- //
- // Send a GOAWAY frame on the client/ingress connection.
- // The GOAWAY frame is used to initiate shutdown of a connection or to signal serious error conditions.
- //
- nghttp2_submit_goaway(stream_data->session_data->session, 0, stream_data->stream_id, NGHTTP2_CONNECT_ERROR, (uint8_t *)"Service Unavailable", 19);
+ hdrs[1].name = (uint8_t *)":content-type";
+ hdrs[1].value = (uint8_t *)"text/plain";
+ hdrs[1].namelen = 13;
+ hdrs[1].valuelen = 10;
+ hdrs[1].flags = NGHTTP2_NV_FLAG_NONE;
+
+ nghttp2_data_provider data_prd;
+ data_prd.read_callback = error_read_callback;
+ data_prd.source.ptr = error_msg;
+ nghttp2_submit_response(stream_data->session_data->session, stream_data->stream_id, hdrs, 2, &data_prd);
+ }
}
if (!conn->ingress && (disp == PN_RELEASED || disp == PN_MODIFIED || disp == PN_REJECTED)) {
@@ -1908,7 +1980,6 @@ static int handle_incoming_http(qdr_http2_connection_t *conn)
qd_http2_buffer_insert(buf, raw_buff_size);
count += raw_buff_size;
DEQ_INSERT_TAIL(buffers, buf);
- //qd_log(http2_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] handle_incoming_http - Inserting qd_http2_buffer of size %"PRIu32" ", conn->conn_id, raw_buff_size);
}
}
@@ -2072,10 +2143,37 @@ static void close_connections(qdr_http2_connection_t* conn)
qdr_action_enqueue(http2_adaptor->core, action);
}
+static void clean_session_data(qdr_http2_connection_t* conn)
+{
+ free_all_connection_streams(conn, false);
+
+ //
+ // This closes the nghttp2 session. Next time when a new connection is opened, a new nghttp2 session
+ // will be created by calling nghttp2_session_client_new
+ //
+ nghttp2_session_del(conn->session_data->session);
+ conn->session_data->session = 0;
+
+ //
+ // Free all the buffers on this session. This session is closed and any unsent buffers should be freed.
+ //
+ qd_http2_buffer_t *buf = DEQ_HEAD(conn->session_data->buffs);
+ qd_http2_buffer_t *curr_buf = 0;
+ while (buf) {
+ curr_buf = buf;
+ DEQ_REMOVE_HEAD(conn->session_data->buffs);
+ buf = DEQ_HEAD(conn->session_data->buffs);
+ free_qd_http2_buffer_t(curr_buf);
+ }
+}
+
static void handle_disconnected(qdr_http2_connection_t* conn)
{
sys_mutex_lock(qd_server_get_activation_lock(http2_adaptor->core->qd->server));
+
+ clean_session_data(conn);
+
if (conn->pn_raw_conn) {
qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Setting conn->pn_raw_conn=0", conn->conn_id);
conn->pn_raw_conn = 0;
@@ -2128,11 +2226,11 @@ static void egress_conn_timer_handler(void *context)
{
qdr_http2_connection_t* conn = (qdr_http2_connection_t*) context;
- qd_log(http2_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Running egress_conn_timer_handler", conn->conn_id);
-
- if (conn->connection_established)
+ if (conn->pn_raw_conn || conn->connection_established)
return;
+ qd_log(http2_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Running egress_conn_timer_handler", conn->conn_id);
+
if (!conn->ingress) {
qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] - Egress_conn_timer_handler - Trying to establishing outbound connection", conn->conn_id);
http_connector_establish(conn);
@@ -2249,8 +2347,11 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
send_settings_frame(conn);
qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Accepted Ingress ((PN_RAW_CONNECTION_CONNECTED)) from %s", conn->conn_id, conn->remote_address);
} else {
- if (!conn->session_data->session)
+ if (!conn->session_data->session) {
nghttp2_session_client_new(&conn->session_data->session, (nghttp2_session_callbacks *)http2_adaptor->callbacks, (void *)conn);
+ send_settings_frame(conn);
+ conn->client_magic_sent = true;
+ }
qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connected Egress (PN_RAW_CONNECTION_CONNECTED)", conn->conn_id);
conn->connection_established = true;
create_stream_dispatcher_link(conn);
diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h
index decbfc1..b493136 100644
--- a/src/adaptors/http2/http2_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -64,7 +64,6 @@ struct qdr_http2_session_data_t {
nghttp2_session *session; // A pointer to the nghttp2s' session object
qd_http2_stream_data_list_t streams; // A session can have many streams.
qd_http2_buffer_list_t buffs; // Buffers for writing
- bool max_buffs_in_pool;
};
struct qdr_http2_stream_data_t {
diff --git a/src/message.c b/src/message.c
index 3508a99..6161275 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2350,6 +2350,23 @@ void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_com
DEQ_APPEND(content->buffers, (*field3_buffers));
}
+void qd_message_compose_5(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, qd_composed_field_t *field3, qd_composed_field_t *field4, bool receive_complete)
+{
+ qd_message_content_t *content = MSG_CONTENT(msg);
+ content->receive_complete = receive_complete;
+ qd_buffer_list_t *field1_buffers = qd_compose_buffers(field1);
+ qd_buffer_list_t *field2_buffers = qd_compose_buffers(field2);
+ qd_buffer_list_t *field3_buffers = qd_compose_buffers(field3);
+ qd_buffer_list_t *field4_buffers = qd_compose_buffers(field4);
+
+ content->buffers = *field1_buffers;
+ DEQ_INIT(*field1_buffers);
+ DEQ_APPEND(content->buffers, (*field2_buffers));
+ DEQ_APPEND(content->buffers, (*field3_buffers));
+ DEQ_APPEND(content->buffers, (*field4_buffers));
+
+}
+
int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_blocked)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org