You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/08/27 17:05:13 UTC

[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1743: Moved the adaptor into http2 folder. Removed unused code and comments. Freed streams and connections.

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
     new c90ff4b  DISPATCH-1743: Moved the adaptor into http2 folder. Removed unused code and comments. Freed streams and connections.
c90ff4b is described below

commit c90ff4bdc59c51108b163921f313e6fff8aee01d
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Wed Aug 26 12:54:11 2020 -0400

    DISPATCH-1743: Moved the adaptor into http2 folder. Removed unused code and comments. Freed streams and connections.
---
 python/qpid_dispatch/management/qdrouter.json      |  12 +-
 src/CMakeLists.txt                                 |   2 +-
 .../{http_adaptor.c => http2/http2_adaptor.c}      | 620 +++++++++++----------
 .../{http_adaptor.h => http2/http2_adaptor.h}      |  13 +-
 4 files changed, 348 insertions(+), 299 deletions(-)

diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 4770cfe..c9deff4 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1112,10 +1112,10 @@
                 "protocolVersion": {
                     "description": "The version of the HTTP protocol supported by this listener.",
                     "type": [
-                        "HTTP/1.x",
-                        "HTTP/2.0"
+                        "HTTP1",
+                        "HTTP2"
                     ],
-                    "default": "HTTP/1.x",
+                    "default": "HTTP1",
                     "required": false,
                     "create": true
                 }
@@ -1146,10 +1146,10 @@
                 "protocolVersion": {
                     "description": "The version of the HTTP protocol supported by this connector.",
                     "type": [
-                        "HTTP/1.x",
-                        "HTTP/2.0"
+                        "HTTP1",
+                        "HTTP2"
                     ],
-                    "default": "HTTP/1.x",
+                    "default": "HTTP1",
                     "required": false,
                     "create": true
                 }
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index b780011..92b0c5f 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -40,7 +40,7 @@ add_custom_command (
 set(qpid_dispatch_SOURCES
   adaptors/reference_adaptor.c
   adaptors/http_common.c
-  adaptors/http_adaptor.c
+  adaptors/http2/http2_adaptor.c
   adaptors/http1/http1_lib.c
   adaptors/http1/http1_adaptor.c
   adaptors/tcp_adaptor.c
diff --git a/src/adaptors/http_adaptor.c b/src/adaptors/http2/http2_adaptor.c
similarity index 86%
rename from src/adaptors/http_adaptor.c
rename to src/adaptors/http2/http2_adaptor.c
index 2a66518..cbd1076 100644
--- a/src/adaptors/http_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -16,6 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include "http2_adaptor.h"
+
 #include <stdio.h>
 #include <inttypes.h>
 
@@ -28,10 +30,9 @@
 
 #include <qpid/dispatch/buffer.h>
 
-#include "qpid/dispatch/protocol_adaptor.h"
-#include "delivery.h"
-#include "http_common.h"
-#include "http_adaptor.h"
+#include <qpid/dispatch/protocol_adaptor.h>
+#include "adaptors/http_common.h"
+
 
 const char *PATH = ":path";
 const char *METHOD = ":method";
@@ -139,8 +140,9 @@ qd_composed_field_t  *qd_message_compose_amqp(qd_message_t *msg,
     return field;
 }
 
-void free_http2_stream_data(qdr_http2_stream_data_t *stream_data)
+static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data)
 {
+    qdr_http2_session_data_t *session_data = stream_data->session_data;
     stream_data->session_data = 0;
     if (stream_data->in_link)
         qdr_link_detach(stream_data->in_link, QD_CLOSED, 0);
@@ -149,6 +151,8 @@ void free_http2_stream_data(qdr_http2_stream_data_t *stream_data)
     free(stream_data->reply_to);
     qd_compose_free(stream_data->app_properties);
     qd_compose_free(stream_data->body);
+    DEQ_REMOVE(session_data->streams, stream_data);
+    nghttp2_session_set_stream_user_data(session_data->session, stream_data->stream_id, NULL);
     free_qdr_http2_stream_data_t(stream_data);
 }
 
@@ -158,9 +162,7 @@ void free_http2_stream(qdr_http2_session_data_t *session_data, int32_t stream_id
         return;
 
     qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
-    DEQ_REMOVE(session_data->streams, stream_data);
     free_http2_stream_data(stream_data);
-    nghttp2_session_set_stream_user_data(session_data->session, stream_id, NULL);
 }
 
 static char *get_address_string(pn_raw_connection_t *pn_raw_conn)
@@ -175,14 +177,15 @@ static char *get_address_string(pn_raw_connection_t *pn_raw_conn)
     }
 }
 
-void free_qdr_http_connection(qdr_http_connection_t* http_conn)
+void free_qdr_http_connection(qdr_http2_connection_t* http_conn)
 {
     if(http_conn->remote_address) {
         free(http_conn->remote_address);
         http_conn->remote_address = 0;
     }
     if (http_conn->activate_timer) {
-        //qd_timer_free(http_conn->activate_timer);
+        qd_timer_free(http_conn->activate_timer);
+        http_conn->activate_timer = 0;
     }
 
     qdr_http2_stream_data_t *stream_data = 0;
@@ -228,10 +231,13 @@ static int on_data_chunk_recv_callback(nghttp2_session *session,
                                                    const uint8_t *data,
                                                    size_t len, void *user_data)
 {
-    qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+    qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
     qdr_http2_session_data_t *session_data = conn->session_data;
     qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
 
+    if (!stream_data)
+        return 0;
+
     qd_buffer_list_t buffers;
     DEQ_INIT(buffers);
     qd_buffer_list_append(&buffers, (uint8_t *)data, len);
@@ -261,10 +267,10 @@ static int on_stream_close_callback(nghttp2_session *session,
                                     nghttp2_error_code error_code,
                                     void *user_data)
 {
-    qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+    qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
     qdr_http2_session_data_t *session_data = conn->session_data;
-    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 on_stream_close_callback, freeing stream", conn->conn_id, stream_id);
-    free_http2_stream(session_data, stream_id);
+    qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
+    stream_data->steam_closed = true;
     return 0;
 }
 
@@ -275,7 +281,7 @@ static ssize_t send_callback(nghttp2_session *session,
                              size_t length,
                              int flags,
                              void *user_data) {
-    qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+    qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
     qdr_http2_session_data_t *session_data = conn->session_data;
     qd_buffer_list_append(&(session_data->buffs), (uint8_t *)data, length);
     qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] HTTP2 send_callback data length %zu, DEQ_SIZE(session_data->buffs)=%zu", conn->conn_id, length, DEQ_SIZE(session_data->buffs));
@@ -289,13 +295,14 @@ static int on_begin_headers_callback(nghttp2_session *session,
                                      const nghttp2_frame *frame,
                                      void *user_data)
 {
-    qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+    qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
     qdr_http2_session_data_t *session_data = conn->session_data;
     qdr_http2_stream_data_t *stream_data = 0;
 
+    // For the client applications, frame->hd.type is either NGHTTP2_HEADERS or NGHTTP2_PUSH_PROMISE
+    // TODO - deal with NGHTTP2_PUSH_PROMISE
     if (frame->hd.type == NGHTTP2_HEADERS) {
         if(frame->headers.cat == NGHTTP2_HCAT_REQUEST && conn->ingress) {
-            // This is a brand new request.
             int32_t stream_id = frame->hd.stream_id;
             qdr_terminus_t *target = qdr_terminus(0);
             stream_data = create_http2_stream_data(session_data, stream_id);
@@ -310,7 +317,7 @@ static int on_begin_headers_callback(nghttp2_session *session,
                                                          QD_INCOMING,
                                                          qdr_terminus(0),  //qdr_terminus_t   *source,
                                                          target,           //qdr_terminus_t   *target,
-                                                         "tcp.ingress.in",         //const char       *name,
+                                                         "http.ingress.in",         //const char       *name,
                                                          0,                //const char       *terminus_addr,
                                                          false,
                                                          NULL,
@@ -322,7 +329,7 @@ static int on_begin_headers_callback(nghttp2_session *session,
             //
 
             //
-            // TODO - Why not create something like a UUID and prefix it with the router id ?
+            // TODO - Andrew was asking if the router could expose the function that generates a dynamic address.
             //
             qdr_terminus_t *dynamic_source = qdr_terminus(0);
             qdr_terminus_set_dynamic(dynamic_source);
@@ -340,6 +347,7 @@ static int on_begin_headers_callback(nghttp2_session *session,
         else if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
             int32_t stream_id = frame->hd.stream_id;
             stream_data = (qdr_http2_stream_data_t *)nghttp2_session_get_stream_user_data(session_data->session, stream_id);
+            // TODO - This is not right.
             stream_data->message = qd_message();
         }
     }
@@ -366,7 +374,7 @@ static int on_header_callback(nghttp2_session *session,
                               void *user_data)
 {
     int32_t stream_id = frame->hd.stream_id;
-    qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+    qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
     qdr_http2_session_data_t *session_data = conn->session_data;
     qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
 
@@ -385,7 +393,7 @@ static int on_header_callback(nghttp2_session *session,
 }
 
 
-static void compose_and_deliver(qdr_http2_stream_data_t *stream_data, qd_composed_field_t  *header_and_prop, qdr_http_connection_t *conn, bool receive_complete)
+static void compose_and_deliver(qdr_http2_stream_data_t *stream_data, qd_composed_field_t  *header_and_prop, qdr_http2_connection_t *conn, bool receive_complete)
 {
     if (receive_complete) {
         if (!stream_data->body) {
@@ -424,46 +432,52 @@ static void compose_and_deliver(qdr_http2_stream_data_t *stream_data, qd_compose
 static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_complete)
 {
     qd_composed_field_t  *header_and_prop = 0;
-    qdr_http_connection_t *conn  = stream_data->session_data->conn;
+    qdr_http2_connection_t *conn  = stream_data->session_data->conn;
 
     bool delivery_routed = false;
 
     if (conn->ingress) {
         if (stream_data->reply_to && stream_data->entire_header_arrived && !stream_data->in_dlv) {
-            delivery_routed = true;
             header_and_prop = qd_message_compose_amqp(stream_data->message,
-                                                  conn->config->address,
-                                                  0,
-                                                  stream_data->reply_to,
-                                                  0, 0,
-                                                  stream_data->stream_id);
+                                                  conn->config->address,  // const char *to
+                                                  0,                      // const char *subject
+                                                  stream_data->reply_to,  // const char *reply_to
+                                                  0,                      // const char *content_type
+                                                  0,                      // const char *content_encoding
+                                                  0);                     // int32_t  correlation_id
             compose_and_deliver(stream_data, header_and_prop, conn, receive_complete);
+            delivery_routed = true;
         }
     }
     else {
         if (stream_data->entire_header_arrived) {
-            delivery_routed = true;
             header_and_prop = qd_message_compose_amqp(stream_data->message,
-                                                  stream_data->reply_to, 0, 0, 0, 0,
-                                                  stream_data->stream_id);
+                                                  stream_data->reply_to,  // const char *to
+                                                  0,                      // const char *subject
+                                                  0,                      // const char *reply_to
+                                                  0,                      // const char *content_type
+                                                  0,                      // const char *content_encoding
+                                                  0);                     // int32_t  correlation_id
             compose_and_deliver(stream_data, header_and_prop, conn, receive_complete);
+            delivery_routed = true;
         }
     }
 
     return delivery_routed;
 }
 
-static void write_buffers(qdr_http_connection_t *conn)
+static void write_buffers(qdr_http2_connection_t *conn)
 {
     qdr_http2_session_data_t *session_data = conn->session_data;
     size_t pn_buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
-    size_t num_buffs = DEQ_SIZE(session_data->buffs) > pn_buffs_to_write ? pn_buffs_to_write : DEQ_SIZE(session_data->buffs);
+    size_t qd_buffs_to_write = DEQ_SIZE(session_data->buffs);
+    size_t num_buffs = qd_buffs_to_write > pn_buffs_to_write ? pn_buffs_to_write : qd_buffs_to_write;
 
-    //
-    // No buffers to write, no need to proceed.
-    //
     if (num_buffs == 0) {
-        qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Zero buffers written in write_buffers - pn_raw_connection_write_buffers_capacity = %zu, DEQ_SIZE(session_data->buffs) = %zu - returning", conn->conn_id, pn_buffs_to_write, DEQ_SIZE(session_data->buffs));
+        //
+        // No buffers to write, cannot proceed.
+        //
+        qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Written 0 buffers in write_buffers() - pn_raw_connection_write_buffers_capacity = %zu, DEQ_SIZE(session_data->buffs) = %zu - returning", conn->conn_id, pn_buffs_to_write, DEQ_SIZE(session_data->buffs));
         return;
     }
 
@@ -488,9 +502,9 @@ static void write_buffers(qdr_http_connection_t *conn)
 
     if (i >0) {
         size_t num_buffers_written = pn_raw_connection_write_buffers(session_data->conn->pn_raw_conn, raw_buffers, num_buffs);
-        qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Written %i buffer(s) and %i bytes in pn_raw_connection_write_buffers", conn->conn_id, num_buffers_written, total_bytes);
+        qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Written %i buffer(s) and %i bytes in write_buffers() using pn_raw_connection_write_buffers()", conn->conn_id, num_buffers_written, total_bytes);
         if (num_buffs != num_buffers_written) {
-            assert(false);
+            //TODO - This is not good.
         }
     }
 }
@@ -506,7 +520,7 @@ static void write_buffers(qdr_http_connection_t *conn)
 //}
 
 
-static void send_settings_frame(qdr_http_connection_t *conn)
+static void send_settings_frame(qdr_http2_connection_t *conn)
 {
     qdr_http2_session_data_t *session_data = conn->session_data;
     nghttp2_settings_entry iv[3] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100},
@@ -516,7 +530,7 @@ static void send_settings_frame(qdr_http_connection_t *conn)
     // You must call nghttp2_session_send after calling nghttp2_submit_settings
     int rv = nghttp2_submit_settings(session_data->session, NGHTTP2_FLAG_NONE, iv, ARRLEN(iv));
     if (rv != 0) {
-        qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Fatal error sending settings frame, rv=%i", conn->conn_id, rv);
+        qd_log(http_adaptor->log_source, QD_LOG_ERROR, "[C%i] Fatal error sending settings frame, rv=%i", conn->conn_id, rv);
         return;
     }
     qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Initial SETTINGS frame sent", conn->conn_id);
@@ -528,12 +542,15 @@ static void send_settings_frame(qdr_http_connection_t *conn)
 static int on_frame_recv_callback(nghttp2_session *session,
                                   const nghttp2_frame *frame, void *user_data)
 {
-    qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+    qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
     qdr_http2_session_data_t *session_data = conn->session_data;
 
     int32_t stream_id = frame->hd.stream_id;
     qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
 
+    if (!stream_data)
+        return 0;
+
     switch (frame->hd.type) {
     case NGHTTP2_SETTINGS: {
         qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 SETTINGS frame received", conn->conn_id, stream_id);
@@ -601,171 +618,6 @@ static int on_frame_recv_callback(nghttp2_session *session,
     return 0;
 }
 
-qdr_http_connection_t *qdr_http_connection_ingress(qd_http_lsnr_t* listener)
-{
-    qdr_http_connection_t* ingress_http_conn = NEW(qdr_http_connection_t);
-    ingress_http_conn->ingress = true;
-    ingress_http_conn->context.context = ingress_http_conn;
-    ingress_http_conn->context.handler = &handle_connection_event;
-    ingress_http_conn->config = &(listener->config);
-    ingress_http_conn->server = listener->server;
-    ingress_http_conn->pn_raw_conn = pn_raw_connection();
-
-    ingress_http_conn->session_data = new_qdr_http2_session_data_t();
-    ZERO(ingress_http_conn->session_data);
-    DEQ_INIT(ingress_http_conn->session_data->buffs);
-    DEQ_INIT(ingress_http_conn->session_data->streams);
-    ingress_http_conn->session_data->conn = ingress_http_conn;
-
-    nghttp2_session_server_new(&(ingress_http_conn->session_data->session), (nghttp2_session_callbacks*)http_adaptor->callbacks, ingress_http_conn);
-
-    pn_raw_connection_set_context(ingress_http_conn->pn_raw_conn, ingress_http_conn);
-    pn_listener_raw_accept(listener->pn_listener, ingress_http_conn->pn_raw_conn);
-    ingress_http_conn->connection_established = true;
-    send_settings_frame(ingress_http_conn);
-    return ingress_http_conn;
-}
-
-static void grant_read_buffers(qdr_http_connection_t *conn)
-{
-    pn_raw_buffer_t raw_buffers[READ_BUFFERS];
-    // Give proactor more read buffers for the pn_raw_conn
-    if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) {
-        size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn);
-        while (desired) {
-            size_t i;
-            for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
-                qd_buffer_t *buf = qd_buffer();
-                raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
-                raw_buffers[i].capacity = qd_buffer_capacity(buf);
-                raw_buffers[i].size = 0;
-                raw_buffers[i].offset = 0;
-                raw_buffers[i].context = (uintptr_t) buf;
-            }
-            desired -= i;
-            pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i);
-        }
-    }
-}
-
-
-static void qdr_http_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
-{
-}
-
-
-static void qdr_http_flow(void *context, qdr_link_t *link, int credit)
-{
-}
-
-
-static void qdr_http_offer(void *context, qdr_link_t *link, int delivery_count)
-{
-}
-
-
-static void qdr_http_drained(void *context, qdr_link_t *link)
-{
-}
-
-
-static void qdr_http_drain(void *context, qdr_link_t *link, bool mode)
-{
-}
-
-static int qdr_http_get_credit(void *context, qdr_link_t *link)
-{
-    return 10;
-}
-
-
-static void qdr_http_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
-{
-}
-
-
-static void qdr_http_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
-{
-}
-
-
-static void qdr_http_conn_trace(void *context, qdr_connection_t *conn, bool trace)
-{
-}
-
-
-static void qdr_http_first_attach(void *context, qdr_connection_t *conn, qdr_link_t *link,
-                                 qdr_terminus_t *source, qdr_terminus_t *target,
-                                 qd_session_class_t session_class)
-{
-}
-
-
-static void qdr_copy_reply_to(qdr_http2_stream_data_t* stream_data, qd_iterator_t* reply_to)
-{
-    int length = qd_iterator_length(reply_to);
-    stream_data->reply_to = malloc(length + 1);
-    qd_iterator_strncpy(reply_to, stream_data->reply_to, length + 1);
-}
-
-
-static void qdr_http_second_attach(void *context, qdr_link_t *link,
-                                  qdr_terminus_t *source, qdr_terminus_t *target)
-{
-    qdr_http2_stream_data_t *stream_data =  (qdr_http2_stream_data_t*)qdr_link_get_context(link);
-    if (stream_data) {
-        if (qdr_link_direction(link) == QD_OUTGOING && source->dynamic) {
-            if (stream_data->session_data->conn->ingress) {
-                qdr_copy_reply_to(stream_data, qdr_terminus_get_address(source));
-                qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to is available now, trying to route delivery", stream_data->session_data->conn->conn_id);
-                if (route_delivery(stream_data, qd_message_receive_complete(stream_data->message))) {
-                    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to available now, delivery routed successfully", stream_data->session_data->conn->conn_id);
-                }
-                else {
-                    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to available now, delivery not routed", stream_data->session_data->conn->conn_id);
-                }
-                grant_read_buffers(stream_data->session_data->conn);
-            }
-            qdr_link_flow(http_adaptor->core, link, 10, false);
-        }
-    }
-}
-
-static void qdr_http_activate(void *notused, qdr_connection_t *c)
-{
-    qdr_http_connection_t* conn = (qdr_http_connection_t*) qdr_connection_get_context(c);
-    if (conn) {
-        if (conn->pn_raw_conn) {
-            qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Activation triggered, calling pn_raw_connection_wake()", conn->conn_id);
-            pn_raw_connection_wake(conn->pn_raw_conn);
-        } else if (conn->activate_timer) {
-            // On egress, the raw connection is only created once the
-            // first part of the message encapsulating the
-            // client->server half of the stream has been
-            // received. Prior to that however a subscribing link (and
-            // its associated connection must be setup), for which we
-            // fake wakeup by using a timer.
-            qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Activation triggered, no socket yet so scheduling timer", conn->conn_id);
-            qd_timer_schedule(conn->activate_timer, 0);
-        } else {
-            qd_log(http_adaptor->log_source, QD_LOG_ERROR, "[C%i] Cannot activate", conn->conn_id);
-        }
-    }
-}
-
-static int qdr_http_push(void *context, qdr_link_t *link, int limit)
-{
-    return qdr_link_process_deliveries(http_adaptor->core, link, limit);
-}
-
-
-static void http_connector_establish(qdr_http_connection_t *conn)
-{
-    qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Connecting to: %s", conn->conn_id, conn->config->host_port);
-    conn->pn_raw_conn = pn_raw_connection();
-    pn_raw_connection_set_context(conn->pn_raw_conn, conn);
-    pn_proactor_raw_connect(qd_server_proactor(conn->server), conn->pn_raw_conn, conn->config->host_port);
-}
 
 ssize_t read_callback(nghttp2_session *session,
                                   int32_t stream_id, uint8_t *buf,
@@ -773,13 +625,13 @@ ssize_t read_callback(nghttp2_session *session,
                                   nghttp2_data_source *source,
                                   void *user_data)
 {
-    qdr_link_t *link = source->ptr;
-    qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+    qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
     qdr_http2_session_data_t *session_data = conn->session_data;
     qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
 
     qd_message_depth_status_t status = qd_message_check_depth(stream_data->message, QD_DEPTH_BODY);
 
+    //TODO - Discuss with team
     write_buffers(session_data->conn);
 
     switch (status) {
@@ -830,9 +682,6 @@ ssize_t read_callback(nghttp2_session *session,
             if (stream_data->body_data_buff_count == 0 || pn_buffs_to_write==0) {
                 // We cannot send anything, we need to come back here.
 
-                //TODO - This will not pass code review. Need to investigate.
-                link->credit_to_core = 0;
-                qdr_link_flow(http_adaptor->core, link, 1, false);
                 if (stream_data->body_data_buff_count == 0) {
                     qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Exiting read_callback QD_MESSAGE_BODY_DATA_OK, body_data_buff_count=0, temporarily pausing stream", conn->conn_id, stream_data->stream_id);
                     qd_message_body_data_release(stream_data->curr_body_data);
@@ -859,6 +708,8 @@ ssize_t read_callback(nghttp2_session *session,
             qd_message_body_data_buffers(body_data, raw_buffers, buff_offset, 1);
             stream_data->curr_body_data_buff_offset += 1;
             qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback, size of raw_buffer=%zu", conn->conn_id, stream_data->stream_id, raw_buffers[0].size);
+
+            // TODO - There is a bug here. I have to make sure that the raw buffer size is less than the length.
             memcpy(buf, raw_buffers[0].bytes, raw_buffers[0].size);
             stream_data->body_data_buff_count -= 1;
             if (!stream_data->body_data_buff_count) {
@@ -894,7 +745,6 @@ ssize_t read_callback(nghttp2_session *session,
                 qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_NO_MORE - send is complete, setting NGHTTP2_DATA_FLAG_EOF", conn->conn_id, stream_data->stream_id);
             }
 
-            qdr_link_flow(http_adaptor->core, link, 1, false);
             break;
         }
 
@@ -904,7 +754,6 @@ ssize_t read_callback(nghttp2_session *session,
             //
             *data_flags |= NGHTTP2_DATA_FLAG_EOF;
             qd_message_body_data_release(stream_data->curr_body_data);
-            qdr_link_flow(http_adaptor->core, link, 1, false);
             stream_data->disposition = PN_REJECTED;
             qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_INVALID", conn->conn_id, stream_data->stream_id);
             break;
@@ -915,7 +764,6 @@ ssize_t read_callback(nghttp2_session *session,
             //
             *data_flags |= NGHTTP2_DATA_FLAG_EOF;
             qd_message_body_data_release(stream_data->curr_body_data);
-            qdr_link_flow(http_adaptor->core, link, 1, false);
             stream_data->disposition = PN_REJECTED;
             qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_NOT_DATA", conn->conn_id, stream_data->stream_id);
             break;
@@ -924,7 +772,6 @@ ssize_t read_callback(nghttp2_session *session,
     }
 
     case QD_MESSAGE_DEPTH_INVALID:
-        qdr_link_flow(http_adaptor->core, link, 1, false);
         qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_callback QD_MESSAGE_DEPTH_INVALID", conn->conn_id, stream_data->stream_id);
         stream_data->disposition = PN_REJECTED;
         break;
@@ -937,10 +784,177 @@ ssize_t read_callback(nghttp2_session *session,
     return 0;
 }
 
-uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *link)
+
+
+qdr_http2_connection_t *qdr_http_connection_ingress(qd_http_lsnr_t* listener)
+{
+    qdr_http2_connection_t* ingress_http_conn = NEW(qdr_http2_connection_t);
+    ingress_http_conn->ingress = true;
+    ingress_http_conn->context.context = ingress_http_conn;
+    ingress_http_conn->context.handler = &handle_connection_event;
+    ingress_http_conn->config = &(listener->config);
+    ingress_http_conn->server = listener->server;
+    ingress_http_conn->pn_raw_conn = pn_raw_connection();
+
+    ingress_http_conn->session_data = new_qdr_http2_session_data_t();
+    ZERO(ingress_http_conn->session_data);
+    DEQ_INIT(ingress_http_conn->session_data->buffs);
+    DEQ_INIT(ingress_http_conn->session_data->streams);
+    ingress_http_conn->session_data->conn = ingress_http_conn;
+    ingress_http_conn->data_prd.read_callback = read_callback;
+
+    nghttp2_session_server_new(&(ingress_http_conn->session_data->session), (nghttp2_session_callbacks*)http_adaptor->callbacks, ingress_http_conn);
+    pn_raw_connection_set_context(ingress_http_conn->pn_raw_conn, ingress_http_conn);
+    pn_listener_raw_accept(listener->pn_listener, ingress_http_conn->pn_raw_conn);
+    return ingress_http_conn;
+}
+
+static void grant_read_buffers(qdr_http2_connection_t *conn)
+{
+    pn_raw_buffer_t raw_buffers[READ_BUFFERS];
+    // Give proactor more read buffers for the pn_raw_conn
+    if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) {
+        size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn);
+        while (desired) {
+            size_t i;
+            for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
+                qd_buffer_t *buf = qd_buffer();
+                raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
+                raw_buffers[i].capacity = qd_buffer_capacity(buf);
+                raw_buffers[i].size = 0;
+                raw_buffers[i].offset = 0;
+                raw_buffers[i].context = (uintptr_t) buf;
+            }
+            desired -= i;
+            pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i);
+        }
+    }
+}
+
+
+static void qdr_http_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
+{
+}
+
+
+static void qdr_http_flow(void *context, qdr_link_t *link, int credit)
+{
+}
+
+
+static void qdr_http_offer(void *context, qdr_link_t *link, int delivery_count)
+{
+}
+
+
+static void qdr_http_drained(void *context, qdr_link_t *link)
+{
+}
+
+
+static void qdr_http_drain(void *context, qdr_link_t *link, bool mode)
+{
+}
+
+static int qdr_http_get_credit(void *context, qdr_link_t *link)
+{
+    return 10;
+}
+
+
+static void qdr_http_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
+{
+}
+
+
+static void qdr_http_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
+{
+}
+
+
+static void qdr_http_conn_trace(void *context, qdr_connection_t *conn, bool trace)
+{
+}
+
+
+static void qdr_http_first_attach(void *context, qdr_connection_t *conn, qdr_link_t *link,
+                                 qdr_terminus_t *source, qdr_terminus_t *target,
+                                 qd_session_class_t session_class)
+{
+}
+
+
+static void qdr_copy_reply_to(qdr_http2_stream_data_t* stream_data, qd_iterator_t* reply_to)
+{
+    int length = qd_iterator_length(reply_to);
+    stream_data->reply_to = malloc(length + 1);
+    qd_iterator_strncpy(reply_to, stream_data->reply_to, length + 1);
+}
+
+
+static void qdr_http_second_attach(void *context, qdr_link_t *link,
+                                  qdr_terminus_t *source, qdr_terminus_t *target)
+{
+    qdr_http2_stream_data_t *stream_data =  (qdr_http2_stream_data_t*)qdr_link_get_context(link);
+    if (stream_data) {
+        if (qdr_link_direction(link) == QD_OUTGOING && source->dynamic) {
+            if (stream_data->session_data->conn->ingress) {
+                qdr_copy_reply_to(stream_data, qdr_terminus_get_address(source));
+                if (route_delivery(stream_data, qd_message_receive_complete(stream_data->message))) {
+                    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to available now, delivery routed successfully", stream_data->session_data->conn->conn_id);
+                }
+                else {
+                    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to available but delivery not routed", stream_data->session_data->conn->conn_id);
+                }
+                grant_read_buffers(stream_data->session_data->conn);
+            }
+            qdr_link_flow(http_adaptor->core, link, 10, false);
+        }
+    }
+}
+
+static void qdr_http_activate(void *notused, qdr_connection_t *c)
+{
+    qdr_http2_connection_t* conn = (qdr_http2_connection_t*) qdr_connection_get_context(c);
+    if (conn) {
+        if (conn->pn_raw_conn) {
+            qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Activation triggered, calling pn_raw_connection_wake()", conn->conn_id);
+            pn_raw_connection_wake(conn->pn_raw_conn);
+        }
+        else if (conn->activate_timer) {
+            // On egress, the raw connection is only created once the
+            // first part of the message encapsulating the
+            // client->server half of the stream has been
+            // received. Prior to that however a subscribing link (and
+            // its associated connection must be setup), for which we
+            // fake wakeup by using a timer.
+            qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Activation triggered, no socket yet so scheduling timer", conn->conn_id);
+            qd_timer_schedule(conn->activate_timer, 0);
+        } else {
+            qd_log(http_adaptor->log_source, QD_LOG_ERROR, "[C%i] Cannot activate", conn->conn_id);
+        }
+    }
+}
+
+static int qdr_http_push(void *context, qdr_link_t *link, int limit)
+{
+    return qdr_link_process_deliveries(http_adaptor->core, link, limit);
+}
+
+
+static void http_connector_establish(qdr_http2_connection_t *conn)
+{
+    qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Connecting to: %s", conn->conn_id, conn->config->host_port);
+    conn->pn_raw_conn = pn_raw_connection();
+    pn_raw_connection_set_context(conn->pn_raw_conn, conn);
+    pn_proactor_raw_connect(qd_server_proactor(conn->server), conn->pn_raw_conn, conn->config->host_port);
+}
+
+
+uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
 {
     qdr_http2_session_data_t *session_data = stream_data->session_data;
-    qdr_http_connection_t *conn = session_data->conn;
+    qdr_http2_connection_t *conn = session_data->conn;
     qd_message_t *message = stream_data->message;
     if (stream_data->out_dlv) {
 
@@ -965,8 +979,6 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *
 
             nghttp2_nv hdrs[count];
 
-            int stream_id = stream_data->session_data->conn->ingress?stream_data->stream_id: -1;
-
             for (uint32_t idx = 0; idx < count; idx++) {
                 qd_parsed_field_t *key = qd_parse_sub_key(app_properties_fld, idx);
                 qd_parsed_field_t *val = qd_parse_sub_value(app_properties_fld, idx);
@@ -980,13 +992,18 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *
                 hdrs[idx].flags = NGHTTP2_NV_FLAG_NONE;
             }
 
+            int stream_id = stream_data->session_data->conn->ingress?stream_data->stream_id: -1;
+
             // This does not really submit the request. We need to read the bytes
             //nghttp2_session_set_next_stream_id(session_data->session, stream_data->stream_id);
             stream_data->stream_id = nghttp2_submit_headers(session_data->session,
                                                             0,
-                                                            stream_id, NULL, hdrs,
+                                                            stream_id,
+                                                            NULL,
+                                                            hdrs,
                                                             count,
                                                             stream_data);
+
             if (stream_id != -1) {
                 stream_data->stream_id = stream_id;
             }
@@ -1012,8 +1029,6 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *
         }
         else {
             qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Processing message body", conn->conn_id, stream_data->stream_id);
-            conn->data_prd.read_callback = read_callback;
-            conn->data_prd.source.ptr = link;
             int rv = nghttp2_submit_data(session_data->session, NGHTTP2_FLAG_END_STREAM, stream_data->stream_id, &conn->data_prd);
             if (rv != 0) {
                 qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] Error submitting data rv=%i", conn->conn_id, stream_data->stream_id, rv);
@@ -1036,18 +1051,13 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
 {
     qdr_http2_stream_data_t *stream_data =  (qdr_http2_stream_data_t*)qdr_link_get_context(link);
 
-    if (!stream_data)
-        return 0;
+    assert(stream_data);
 
-    qdr_http_connection_t *conn = stream_data->session_data->conn;
+    qdr_http2_connection_t *conn = stream_data->session_data->conn;
 
     if (link == stream_data->session_data->conn->stream_dispatcher) {
         qd_message_t *msg = qdr_delivery_message(delivery);
-        qd_iterator_t     *iter  = qd_message_field_iterator_typed(msg, QD_FIELD_CORRELATION_ID);
-        qd_parsed_field_t *cid_field = qd_parse(iter);
-        uint32_t stream_id = qd_parse_as_int(cid_field);
-
-        qdr_http2_stream_data_t *stream_data = create_http2_stream_data(conn->session_data, stream_id);
+        qdr_http2_stream_data_t *stream_data = create_http2_stream_data(conn->session_data, 0);
 
         stream_data->message = qdr_delivery_message(delivery);
         stream_data->out_dlv = delivery;
@@ -1057,7 +1067,7 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
                                                      QD_OUTGOING,
                                                      source,           //qdr_terminus_t   *source,
                                                      qdr_terminus(0),  //qdr_terminus_t   *target,
-                                                     "tcp.egress.out", //const char       *name,
+                                                     "http.egress.out", //const char       *name,
                                                      0,                //const char       *terminus_addr,
                                                      true,
                                                      delivery,
@@ -1083,7 +1093,7 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
         qdr_link_set_context(stream_data->in_link, stream_data);
 
         //Let's make an outbound connection to the configured connector.
-        qdr_http_connection_t *conn = stream_data->session_data->conn;
+        qdr_http2_connection_t *conn = stream_data->session_data->conn;
         if (!conn->connection_established) {
             if (!conn->ingress) {
                 http_connector_establish(conn);
@@ -1096,9 +1106,8 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
                 stream_data->message = qdr_delivery_message(delivery);
                 stream_data->out_dlv = delivery;
             }
-            return handle_outgoing_http(stream_data, link);
+            return handle_outgoing_http(stream_data);
         }
-        qdr_link_flow(http_adaptor->core, link, 1, false);
     }
     return 0;
 }
@@ -1113,7 +1122,7 @@ void qd_http2_delete_connector(qd_dispatch_t *qd, qd_http_connector_t *connector
 }
 
 
-static int handle_incoming_http(qdr_http_connection_t *conn)
+static int handle_incoming_http(qdr_http2_connection_t *conn)
 {
     qd_buffer_list_t buffers;
     DEQ_INIT(buffers);
@@ -1121,6 +1130,7 @@ static int handle_incoming_http(qdr_http_connection_t *conn)
     size_t n;
     int count = 0;
 
+    // TODO - Remove after using connection safe pointers.
     if (!conn->pn_raw_conn)
         return 0;
 
@@ -1136,7 +1146,7 @@ static int handle_incoming_http(qdr_http_connection_t *conn)
     }
 
     //
-    // Read each buffer in the buffer chain and call nghttp2_session_mem_recv with each buffer content
+    // Read each buffer in the buffer chain and call nghttp2_session_mem_recv with buffer content
     //
     qd_buffer_t *buf = DEQ_HEAD(buffers);
     qd_buffer_t *curr_buf = 0;
@@ -1154,7 +1164,7 @@ static int handle_incoming_http(qdr_http_connection_t *conn)
 }
 
 
-qdr_http_connection_t *qdr_http_connection_ingress_accept(qdr_http_connection_t* ingress_http_conn)
+qdr_http2_connection_t *qdr_http_connection_ingress_accept(qdr_http2_connection_t* ingress_http_conn)
 {
     ingress_http_conn->remote_address = get_address_string(ingress_http_conn->pn_raw_conn);
     qdr_connection_info_t *info = qdr_connection_info(false, //bool             is_encrypted,
@@ -1194,28 +1204,54 @@ qdr_http_connection_t *qdr_http_connection_ingress_accept(qdr_http_connection_t*
     ingress_http_conn->qdr_conn = conn;
     ingress_http_conn->conn_id = conn->identity;
     qdr_connection_set_context(conn, ingress_http_conn);
-    //grant_read_buffers(ingress_http_conn);
+    ingress_http_conn->connection_established = true;
     return ingress_http_conn;
 }
 
 
+static void restart_streams(qdr_http2_connection_t *http_conn)
+{
+    qdr_http2_stream_data_t *stream_data = DEQ_HEAD(http_conn->session_data->streams);
+    while (stream_data) {
+        if (stream_data->steam_closed) {
+            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] freeing stream", http_conn->conn_id, stream_data->stream_id);
+            qdr_http2_stream_data_t *next_stream_data = DEQ_NEXT(stream_data);
+            handle_outgoing_http(stream_data);
+            free_http2_stream_data(stream_data);
+            stream_data = next_stream_data;
+        }
+        else {
+            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%i] Restarting stream in restart_streams()", http_conn->conn_id, stream_data->stream_id);
+            handle_outgoing_http(stream_data);
+            stream_data = DEQ_NEXT(stream_data);
+        }
+    }
+}
+
+
 static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void *context)
 {
-    qdr_http_connection_t *conn = (qdr_http_connection_t*) context;
+    qdr_http2_connection_t *conn = (qdr_http2_connection_t*) context;
     qd_log_source_t *log = http_adaptor->log_source;
     switch (pn_event_type(e)) {
     case PN_RAW_CONNECTION_CONNECTED: {
         if (conn->ingress) {
+            send_settings_frame(conn);
             qdr_http_connection_ingress_accept(conn);
-            qd_log(log, QD_LOG_INFO, "[C%i] Accepted from %s", conn->conn_id, conn->remote_address);
+            qd_log(log, QD_LOG_INFO, "[C%i] Accepted ((PN_RAW_CONNECTION_CONNECTED)) from %s", conn->conn_id, conn->remote_address);
         } else {
-            qd_log(log, QD_LOG_INFO, "[C%i] Connected", conn->conn_id);
+            qd_log(log, QD_LOG_INFO, "[C%i] Connected (PN_RAW_CONNECTION_CONNECTED)", conn->conn_id);
             conn->connection_established = true;
             qdr_connection_process(conn->qdr_conn);
         }
         break;
     }
     case PN_RAW_CONNECTION_CLOSED_READ: {
+        printf ("PN_RAW_CONNECTION_CLOSED_READ\n");
+        fflush(stdout);
+        qdr_connection_set_context(conn->qdr_conn, 0);
+        qdr_connection_closed(conn->qdr_conn);
+        conn->qdr_conn = 0;
         pn_raw_connection_close(conn->pn_raw_conn);
         conn->pn_raw_conn = 0;
         qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
@@ -1227,7 +1263,8 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
     }
     case PN_RAW_CONNECTION_DISCONNECTED: {
         qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_DISCONNECTED", conn->conn_id);
-        qdr_connection_closed(conn->qdr_conn);
+        if (conn->qdr_conn)
+            qdr_connection_closed(conn->qdr_conn);
         free_qdr_http_connection(conn);
         break;
     }
@@ -1246,10 +1283,9 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
         break;
     }
     case PN_RAW_CONNECTION_READ: {
-        qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_READ", conn->conn_id);
         int read = handle_incoming_http(conn);
-        qd_log(log, QD_LOG_TRACE, "[C%i] Read %i bytes", conn->conn_id, read);
-        while (qdr_connection_process(conn->qdr_conn)) {}
+        qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_READ Read %i bytes", conn->conn_id, read);
+        //while (qdr_connection_process(conn->qdr_conn)) {}
         break;
     }
     case PN_RAW_CONNECTION_WRITTEN: {
@@ -1261,12 +1297,12 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
                 written += buffs[i].size;
                 qd_buffer_t *qd_buff = (qd_buffer_t *) buffs[i].context;
                 assert(qd_buff);
-                if (qd_buff)
+                if (qd_buff != NULL)
                     qd_buffer_free(qd_buff);
             }
         }
         qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_WRITTEN Wrote %i bytes", conn->conn_id, written);
-        while (qdr_connection_process(conn->qdr_conn)) {}
+        restart_streams(conn);
         break;
     }
     default:
@@ -1315,6 +1351,9 @@ static bool http_listener_listen(qd_http_lsnr_t *li) {
 
 qd_http_lsnr_t *qd_http2_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
 {
+    // TODO - Add lws in the qd_lws_listener.
+    // TOOD - Separate commit.
+    // TODO - use http2 in function names
     qd_http_lsnr_t *li = qd_http_lsnr(qd->server, &handle_listener_event);
     if (!li) {
         qd_log(http_adaptor->log_source, QD_LOG_ERROR, "Unable to create http listener: no memory");
@@ -1332,13 +1371,13 @@ qd_http_lsnr_t *qd_http2_configure_listener(qd_dispatch_t *qd, const qd_http_bri
 
 void qd_http2_delete_listener(qd_dispatch_t *qd, qd_http_lsnr_t *listener)
 {
-    // TBD?
+    // TODO - Not implemented yet.
 }
 
 
 static void on_activate(void *context)
 {
-    qdr_http_connection_t* conn = (qdr_http_connection_t*) context;
+    qdr_http2_connection_t* conn = (qdr_http2_connection_t*) context;
 
     qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] on_activate", conn->conn_id);
     while (qdr_connection_process(conn->qdr_conn)) {}
@@ -1346,36 +1385,34 @@ static void on_activate(void *context)
 
 
 
-qdr_http_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector)
+qdr_http2_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector)
 {
-    qdr_http_connection_t* egress_conn = NEW(qdr_http_connection_t);
-    ZERO(egress_conn);
-    //FIXME: this is only needed while waiting for raw_connection_wake
-    //functionality in proton
-    egress_conn->activate_timer = qd_timer(http_adaptor->core->qd, on_activate, egress_conn);
-
-    egress_conn->ingress = false;
-    egress_conn->context.context = egress_conn;
-    egress_conn->context.handler = &handle_connection_event;
-    egress_conn->config = &(connector->config);
-    egress_conn->server = connector->server;
-    egress_conn->data_prd.read_callback = read_callback;
-
-    egress_conn->session_data = new_qdr_http2_session_data_t();
-    ZERO(egress_conn->session_data);
-    DEQ_INIT(egress_conn->session_data->buffs);
-    DEQ_INIT(egress_conn->session_data->streams);
-    egress_conn->session_data->conn = egress_conn;
-
-    nghttp2_session_client_new(&egress_conn->session_data->session, (nghttp2_session_callbacks*)http_adaptor->callbacks, (void *)egress_conn);
-
-    //pn_raw_connection_set_context(egress_conn->pn_raw_conn, egress_conn);
+    // TODO - Make this a pooled object.
+    qdr_http2_connection_t* egress_http_conn = NEW(qdr_http2_connection_t);
+    ZERO(egress_http_conn);
+    egress_http_conn->activate_timer = qd_timer(http_adaptor->core->qd, on_activate, egress_http_conn);
+
+    egress_http_conn->ingress = false;
+    egress_http_conn->context.context = egress_http_conn;
+    egress_http_conn->context.handler = &handle_connection_event;
+    egress_http_conn->config = &(connector->config);
+    egress_http_conn->server = connector->server;
+    egress_http_conn->data_prd.read_callback = read_callback;
+
+    egress_http_conn->session_data = new_qdr_http2_session_data_t();
+    ZERO(egress_http_conn->session_data);
+    DEQ_INIT(egress_http_conn->session_data->buffs);
+    DEQ_INIT(egress_http_conn->session_data->streams);
+    egress_http_conn->session_data->conn = egress_http_conn;
+
+    nghttp2_session_client_new(&egress_http_conn->session_data->session, (nghttp2_session_callbacks*)http_adaptor->callbacks, (void *)egress_http_conn);
+
     qdr_connection_info_t *info = qdr_connection_info(false, //bool             is_encrypted,
                                                       false, //bool             is_authenticated,
                                                       true,  //bool             opened,
                                                       "",   //char            *sasl_mechanisms,
                                                       QD_OUTGOING, //qd_direction_t   dir,
-                                                      egress_conn->config->host_port,    //const char      *host,
+                                                      egress_http_conn->config->host_port,    //const char      *host,
                                                       "",    //const char      *ssl_proto,
                                                       "",    //const char      *ssl_cipher,
                                                       "",    //const char      *user,
@@ -1391,7 +1428,7 @@ qdr_http_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector
                                                    true,
                                                    QDR_ROLE_NORMAL,
                                                    1,
-                                                   qd_server_allocate_connection_id(egress_conn->server),
+                                                   qd_server_allocate_connection_id(egress_http_conn->server),
                                                    0,
                                                    0,
                                                    false,
@@ -1403,13 +1440,18 @@ qdr_http_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector
                                                    info,
                                                    0,
                                                    0);
-    egress_conn->qdr_conn = conn;
-    egress_conn->conn_id = conn->identity;
-    qdr_connection_set_context(conn, egress_conn);
+    egress_http_conn->qdr_conn = conn;
+
+    //
+    // Using the same identity as the qdr_connection's identity. This is really helpful
+    // when we try to trace the life cycle of a delivery.
+    //
+    egress_http_conn->conn_id = conn->identity;
+    qdr_connection_set_context(conn, egress_http_conn);
 
     qdr_terminus_t *source = qdr_terminus(0);
-    qdr_terminus_set_address(source, egress_conn->config->address);
-    egress_conn->stream_dispatcher = qdr_link_first_attach(conn,
+    qdr_terminus_set_address(source, egress_http_conn->config->address);
+    egress_http_conn->stream_dispatcher = qdr_link_first_attach(conn,
                                                            QD_OUTGOING,
                                                            source,           //qdr_terminus_t   *source,
                                                            qdr_terminus(0),  //qdr_terminus_t   *target,
@@ -1417,18 +1459,17 @@ qdr_http_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector
                                                            0,                //const char       *terminus_addr,
                                                            false,
                                                            0,
-                                                           &(egress_conn->stream_dispatcher_id));
-    // Create a dummy stream_data object and set that as context
+                                                           &(egress_http_conn->stream_dispatcher_id));
+
+    // Create a dummy stream_data object and set that as context.
     qdr_http2_stream_data_t *stream_data = new_qdr_http2_stream_data_t();
     ZERO(stream_data);
-
     stream_data->session_data = new_qdr_http2_session_data_t();
     ZERO(stream_data->session_data);
-    stream_data->stream_id = 0;
-    stream_data->session_data->conn = egress_conn;
+    stream_data->session_data->conn = egress_http_conn;
 
-    qdr_link_set_context(egress_conn->stream_dispatcher, stream_data);
-    return egress_conn;
+    qdr_link_set_context(egress_http_conn->stream_dispatcher, stream_data);
+    return egress_http_conn;
 }
 
 
@@ -1470,8 +1511,8 @@ static void qdr_http_adaptor_init(qdr_core_t *core, void **adaptor_context)
     adaptor->core    = core;
     adaptor->adaptor = qdr_protocol_adaptor(core,
                                             "http",                // name
-                                            adaptor,              // context
-                                            qdr_http_activate,                    // activate
+                                            adaptor,               // context
+                                            qdr_http_activate,
                                             qdr_http_first_attach,
                                             qdr_http_second_attach,
                                             qdr_http_detach,
@@ -1491,6 +1532,9 @@ static void qdr_http_adaptor_init(qdr_core_t *core, void **adaptor_context)
     DEQ_INIT(adaptor->listeners);
     DEQ_INIT(adaptor->connectors);
 
+    //
+    // Register all nghttp2 callbacks.
+    //
     nghttp2_session_callbacks *callbacks;
     nghttp2_session_callbacks_new(&callbacks);
     nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback);
diff --git a/src/adaptors/http_adaptor.h b/src/adaptors/http2/http2_adaptor.h
similarity index 90%
rename from src/adaptors/http_adaptor.h
rename to src/adaptors/http2/http2_adaptor.h
index 53a05b1..33fa3f4 100644
--- a/src/adaptors/http_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -24,6 +24,10 @@
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/log.h>
 #include <nghttp2/nghttp2.h>
+#include <qpid/dispatch/protocol_adaptor.h>
+
+#include "server_private.h"
+#include "adaptors/http_common.h"
 
 
 // We already have a qd_http_listener_t defined in http-libwebsockets.c
@@ -32,11 +36,11 @@
 // and get rid of http-libwebsockets.c and rename this as qd_http_listener_t
 typedef struct qdr_http2_session_data_t qdr_http2_session_data_t;
 typedef struct qdr_http2_stream_data_t  qdr_http2_stream_data_t;
-typedef struct qdr_http_connection_t    qdr_http_connection_t;
+typedef struct qdr_http2_connection_t    qdr_http2_connection_t;
 DEQ_DECLARE(qdr_http2_stream_data_t, qd_http2_stream_data_list_t);
 
 struct qdr_http2_session_data_t {
-    qdr_http_connection_t       *conn;       // Connection associated with the session_data
+    qdr_http2_connection_t       *conn;       // Connection associated with the session_data
     nghttp2_session             *session;    // A pointer to the nghttp2s' session object
     qd_http2_stream_data_list_t  streams;    // A session can have many streams.
     qd_buffer_list_t             buffs;      // Buffers for writing
@@ -63,11 +67,12 @@ struct qdr_http2_stream_data_t {
     int                            body_data_buff_count;
     int32_t                        stream_id;
 
-    bool                     entire_header_arrived; // true if all the headershave arrived, just before the start of the data frame or just before the END_STREAM.
+    bool                     entire_header_arrived;
     bool                     header_sent;
+    bool                     steam_closed;
 };
 
-struct qdr_http_connection_t {
+struct qdr_http2_connection_t {
     qd_handler_context_t     context;
     qdr_connection_t        *qdr_conn;
     pn_raw_connection_t     *pn_raw_conn;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org