You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2020/08/12 17:16:23 UTC

[qpid-dispatch] branch dev-protocol-adaptors updated (a6519cf -> cb7d9cd)

This is an automated email from the ASF dual-hosted git repository.

gsim pushed a change to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git.


    from a6519cf  DISPATCH-1744: Fixed minor error in field name
     new c0fb276  DISPATCH-1654: Initial TCP adaptor
     new fea7d27  DISPATCH-1742: avoid using uninitialised value in conditional
     new cb7d9cd  DISPATCH-1742: free protocol adaptor before core

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 python/qpid_dispatch/management/qdrouter.json      |  61 ++
 python/qpid_dispatch_internal/dispatch.py          |   2 +
 python/qpid_dispatch_internal/management/agent.py  |  28 +
 python/qpid_dispatch_internal/management/config.py |   2 +-
 src/CMakeLists.txt                                 |   1 +
 src/adaptors/tcp_adaptor.c                         | 917 +++++++++++++++++++++
 src/adaptors/tcp_adaptor.h                         |  79 ++
 src/message.c                                      |   1 +
 src/router_node.c                                  |   2 +-
 9 files changed, 1091 insertions(+), 2 deletions(-)
 create mode 100644 src/adaptors/tcp_adaptor.c
 create mode 100644 src/adaptors/tcp_adaptor.h


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 02/03: DISPATCH-1742: avoid using uninitialised value in conditional

Posted by gs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit fea7d27eb8b5785e9683fcd936a8f4a16d4e8ab7
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Wed Aug 12 18:13:17 2020 +0100

    DISPATCH-1742: avoid using uninitialised value in conditional
---
 src/message.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/message.c b/src/message.c
index 4c5cd9c..8c2cf57 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2474,6 +2474,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd
 
     qd_section_status_t section_status;
     qd_field_location_t location;
+    ZERO(&location);
 
     section_status = message_section_check(&msg->body_buffer, &msg->body_cursor,
                                            BODY_DATA_SHORT, 3, TAGS_BINARY,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 01/03: DISPATCH-1654: Initial TCP adaptor

Posted by gs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit c0fb276ecc3a193e9f03790cd3b4f910677cbe3a
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Tue Aug 11 11:32:33 2020 +0100

    DISPATCH-1654: Initial TCP adaptor
---
 python/qpid_dispatch/management/qdrouter.json      |  61 ++
 python/qpid_dispatch_internal/dispatch.py          |   2 +
 python/qpid_dispatch_internal/management/agent.py  |  28 +
 python/qpid_dispatch_internal/management/config.py |   2 +-
 src/CMakeLists.txt                                 |   1 +
 src/adaptors/tcp_adaptor.c                         | 917 +++++++++++++++++++++
 src/adaptors/tcp_adaptor.h                         |  79 ++
 7 files changed, 1089 insertions(+), 1 deletion(-)

diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 421fa0a..0bb2bec 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1151,6 +1151,66 @@
             }
         },
 
+        "tcpListener": {
+            "description": "Ingress TCP bridge.",
+            "extends": "configurationEntity",
+            "operations": ["CREATE", "DELETE"],
+            "attributes": {
+                "address": {
+                    "description":"Address of this bridge",
+                    "type": "string",
+                    "create": true
+                },
+                "host": {
+                    "description":"A host name, IPV4 or IPV6 literal, or the empty string. The empty string listens on all local addresses. A host name listens on all addresses associated with the name. An IPV6 literal address (or wildcard '[::]') listens only for IPV6. An IPV4 literal address (or wildcard '0.0.0.0') listens only for IPV4.",
+                    "type": "string",
+                    "default": "0.0.0.0",
+                    "create": true
+                },
+                "port": {
+                    "description": "Port number or symbolic service name.  If '0', the router shall assign an ephemeral port to the listener and log the port number with a log of the form 'SERVER (notice) Listening on <host>:<assigned-port> (<listener-name>)'",
+                    "type": "string",
+                    "create": true
+                },
+                "siteId": {
+                    "type": "string",
+                    "required": false,
+                    "description": "Used to identify where connection is handled.",
+                    "create": true
+                }
+            }
+        },
+
+        "tcpConnector": {
+            "description": "Egress TCP bridge.",
+            "extends": "configurationEntity",
+            "operations": ["CREATE", "DELETE"],
+            "attributes": {
+                "address": {
+                    "description":"Address of this bridge",
+                    "type": "string",
+                    "create": true
+                },
+                "host": {
+                    "description":"IP address: ipv4 or ipv6 literal or a host name",
+                    "type": "string",
+                    "create": true
+                },
+                "port": {
+                    "description": "Port number or symbolic service name.",
+                    "type": "string",
+                    "create": true
+
+                },
+                "siteId": {
+                    "type": "string",
+                    "required": false,
+                    "description": "Used to identify origin of connections.",
+                    "create": true
+                }
+            }
+        },
+
         "log": {
             "description": "Configure logging for a particular module. You can use the `UPDATE` operation to change log settings while the router is running.",
             "extends": "configurationEntity",
@@ -1174,6 +1234,7 @@
                         "CONN_MGR",
                         "PYTHON",
                         "PROTOCOL",
+                        "TCP_ADAPTOR",
                         "HTTP_ADAPTOR",
                         "DEFAULT"
                     ],
diff --git a/python/qpid_dispatch_internal/dispatch.py b/python/qpid_dispatch_internal/dispatch.py
index ad99749..f5bd91a 100644
--- a/python/qpid_dispatch_internal/dispatch.py
+++ b/python/qpid_dispatch_internal/dispatch.py
@@ -67,6 +67,8 @@ class QdDll(ctypes.PyDLL):
         self._prototype(self.qd_dispatch_configure_connector, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_configure_ssl_profile, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_configure_sasl_plugin, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
+        self._prototype(self.qd_dispatch_configure_tcp_listener, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
+        self._prototype(self.qd_dispatch_configure_tcp_connector, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_configure_http_lsnr, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_configure_http_connector, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_connection_manager_delete_listener, None, [self.qd_dispatch_p, ctypes.c_void_p])
diff --git a/python/qpid_dispatch_internal/management/agent.py b/python/qpid_dispatch_internal/management/agent.py
index af6d8a2..ec5696c 100644
--- a/python/qpid_dispatch_internal/management/agent.py
+++ b/python/qpid_dispatch_internal/management/agent.py
@@ -586,6 +586,34 @@ class HttpListenerEntity(EntityAdapter):
     def _delete(self):
         self._qd.qd_dispatch_delete_http_listener(self._dispatch, self._implementations[0].key)
 
+class TcpListenerEntity(EntityAdapter):
+    def create(self):
+        config_listener = self._qd.qd_dispatch_configure_tcp_listener(self._dispatch, self)
+        return config_listener
+
+    def _identifier(self):
+        return _host_port_name_identifier(self)
+
+    def __str__(self):
+        return super(TcpListenerEntity, self).__str__().replace("Entity(", "TcpListenerEntity(")
+
+    def _delete(self):
+        self._qd.qd_dispatch_delete_tcp_listener(self._dispatch, self._implementations[0].key)
+
+
+class TcpConnectorEntity(EntityAdapter):
+    def create(self):
+        config_connector = self._qd.qd_dispatch_configure_tcp_connector(self._dispatch, self)
+        return config_connector
+
+    def _identifier(self):
+        return _host_port_name_identifier(self)
+
+    def __str__(self):
+        return super(TcpConnectorEntity, self).__str__().replace("Entity(", "TcpConnectorEntity(")
+
+    def _delete(self):
+        self._qd.qd_dispatch_delete_tcp_connector(self._dispatch, self._implementations[0].key)
 
 class HttpConnectorEntity(EntityAdapter):
     def create(self):
diff --git a/python/qpid_dispatch_internal/management/config.py b/python/qpid_dispatch_internal/management/config.py
index 0edfebb..21016f7 100644
--- a/python/qpid_dispatch_internal/management/config.py
+++ b/python/qpid_dispatch_internal/management/config.py
@@ -313,7 +313,7 @@ def configure_dispatch(dispatch, lib_handle, filename):
     for t in "sslProfile", "authServicePlugin", "listener", "connector", \
              "router.config.address", "router.config.linkRoute", "router.config.autoLink", \
              "router.config.exchange", "router.config.binding", \
-             "vhost", "httpListener", "httpConnector":
+             "vhost", "httpListener", "httpConnector", "tcpListener", "tcpConnector":
         for a in config.by_type(t):
             configure(a)
             if t == "sslProfile":
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index f7ac67e..b780011 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -43,6 +43,7 @@ set(qpid_dispatch_SOURCES
   adaptors/http_adaptor.c
   adaptors/http1/http1_lib.c
   adaptors/http1/http1_adaptor.c
+  adaptors/tcp_adaptor.c
   alloc_pool.c
   amqp.c
   bitmask.c
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
new file mode 100644
index 0000000..27dcb5c
--- /dev/null
+++ b/src/adaptors/tcp_adaptor.c
@@ -0,0 +1,917 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/condition.h>
+#include <proton/listener.h>
+#include <proton/netaddr.h>
+#include <proton/proactor.h>
+#include <proton/raw_connection.h>
+#include "qpid/dispatch/ctools.h"
+#include "qpid/dispatch/protocol_adaptor.h"
+#include "delivery.h"
+#include "tcp_adaptor.h"
+#include <stdio.h>
+#include <inttypes.h>
+
+ALLOC_DEFINE(qd_tcp_listener_t);
+ALLOC_DEFINE(qd_tcp_connector_t);
+
+typedef struct qdr_tcp_adaptor_t {
+    qdr_core_t              *core;
+    qdr_protocol_adaptor_t  *adaptor;
+    qd_tcp_listener_list_t   listeners;
+    qd_tcp_connector_list_t  connectors;
+    qd_log_source_t         *log_source;
+} qdr_tcp_adaptor_t;
+
+static qdr_tcp_adaptor_t *tcp_adaptor;
+
+#define READ_BUFFERS 4
+#define WRITE_BUFFERS 4
+
+typedef struct qdr_tcp_connection_t {
+    qd_handler_context_t  context;
+    char                 *reply_to;
+    qdr_connection_t     *conn;
+    uint64_t              conn_id;
+    qdr_link_t           *incoming;
+    uint64_t              incoming_id;
+    qdr_link_t           *outgoing;
+    uint64_t              outgoing_id;
+    pn_raw_connection_t  *socket;
+    qdr_delivery_t       *instream;
+    qdr_delivery_t       *outstream;
+    bool                  ingress;
+    bool                  egress_dispatcher;
+    bool                  connector_closed;//only used if egress_dispatcher=true
+    qd_timer_t           *activate_timer;
+    qd_bridge_config_t   config;
+    qd_server_t          *server;
+    char                 *remote_address;
+} qdr_tcp_connection_t;
+
+static void handle_disconnected(qdr_tcp_connection_t* conn);
+static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn);
+
+static void on_activate(void *context)
+{
+    qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
+
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i] on_activate", conn->conn_id);
+    while (qdr_connection_process(conn->conn)) {}
+    if (conn->egress_dispatcher && conn->connector_closed) {
+        qdr_connection_closed(conn->conn);
+        qdr_connection_set_context(conn->conn, 0);
+        free_qdr_tcp_connection(conn);
+    }
+}
+
+static void grant_read_buffers(qdr_tcp_connection_t *conn)
+{
+    pn_raw_buffer_t raw_buffers[READ_BUFFERS];
+    // Give proactor more read buffers for the socket
+    if (!pn_raw_connection_is_read_closed(conn->socket)) {
+        size_t desired = pn_raw_connection_read_buffers_capacity(conn->socket);
+        while (desired) {
+            size_t i;
+            for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
+                qd_buffer_t *buf = qd_buffer();
+                raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
+                raw_buffers[i].capacity = qd_buffer_capacity(buf);
+                raw_buffers[i].size = 0;
+                raw_buffers[i].offset = 0;
+                raw_buffers[i].context = (uintptr_t) buf;
+            }
+            desired -= i;
+            pn_raw_connection_give_read_buffers(conn->socket, raw_buffers, i);
+        }
+    }
+}
+
+static int handle_incoming(qdr_tcp_connection_t *conn)
+{
+    qd_buffer_list_t buffers;
+    DEQ_INIT(buffers);
+    pn_raw_buffer_t raw_buffers[READ_BUFFERS];
+    size_t n;
+    int count = 0;
+    while ( (n = pn_raw_connection_take_read_buffers(conn->socket, raw_buffers, READ_BUFFERS)) ) {
+        for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) {
+            qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
+            qd_buffer_insert(buf, raw_buffers[i].size);
+            count += raw_buffers[i].size;
+            DEQ_INSERT_TAIL(buffers, buf);
+        }
+    }
+
+    grant_read_buffers(conn);
+
+    if (conn->instream) {
+        qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+        qd_compose_insert_binary_buffers(field, &buffers);
+        qd_message_extend(qdr_delivery_message(conn->instream), field);
+        qd_compose_free(field);
+        qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i][L%i] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count);
+    } else {
+        qd_message_t *msg = qd_message();
+
+        qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
+        qd_compose_start_list(props);
+        qd_compose_insert_null(props);                      // message-id
+        qd_compose_insert_null(props);                      // user-id
+        qd_compose_insert_null(props);                      // to
+        qd_compose_insert_null(props);                      // subject
+        qd_compose_insert_string(props, conn->reply_to);    // reply-to
+        //qd_compose_insert_null(props);                      // correlation-id
+        //qd_compose_insert_null(props);                      // content-type
+        //qd_compose_insert_null(props);                      // content-encoding
+        //qd_compose_insert_timestamp(props, 0);              // absolute-expiry-time
+        //qd_compose_insert_timestamp(props, 0);              // creation-time
+        //qd_compose_insert_null(props);                      // group-id
+        //qd_compose_insert_uint(props, 0);                   // group-sequence
+        //qd_compose_insert_null(props);                      // reply-to-group-id
+        qd_compose_end_list(props);
+
+        if (count > 0) {
+            props = qd_compose(QD_PERFORMATIVE_BODY_DATA, props);
+            qd_compose_insert_binary_buffers(props, &buffers);
+        }
+
+        qd_message_compose_2(msg, props, false);
+        qd_compose_free(props);
+
+        conn->instream = qdr_link_deliver(conn->incoming, msg, 0, false, 0, 0, 0, 0);
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i][L%i] Initiating message with %i bytes", conn->conn_id, conn->incoming_id, count);
+    }
+    return count;
+}
+
+static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc)
+{
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "Freeing %p", (void*) tc);
+    if (tc->reply_to) {
+        free(tc->reply_to);
+    }
+    if(tc->remote_address) {
+        free(tc->remote_address);
+    }
+    if (tc->activate_timer) {
+        qd_timer_free(tc->activate_timer);
+    }
+    //proactor will free the socket
+    free(tc);
+}
+
+static void handle_disconnected(qdr_tcp_connection_t* conn)
+{
+    if (conn->instream) {
+        qd_message_set_receive_complete(qdr_delivery_message(conn->instream));
+        qdr_delivery_continue(tcp_adaptor->core, conn->instream, true);
+    }
+    qdr_connection_closed(conn->conn);
+    qdr_connection_set_context(conn->conn, 0);
+    free_qdr_tcp_connection(conn);
+}
+
+static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_raw_buffer_t *buffers, int count)
+{
+    int used = 0;
+    qd_message_body_data_t *body_data;
+    while (used < count) {
+        qd_message_body_data_result_t body_data_result = qd_message_next_body_data(msg, &body_data);
+        if (body_data_result == QD_MESSAGE_BODY_DATA_OK) {
+            used += qd_message_body_data_buffers(body_data, buffers + used, used, count - used);
+            if (used > 0) {
+                buffers[used-1].context = (uintptr_t) body_data;
+            }
+        } else if (body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) {
+            return used;
+        } else {
+            switch (body_data_result) {
+            case QD_MESSAGE_BODY_DATA_NO_MORE:
+                qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%i] EOS", conn->conn_id); break;
+            case QD_MESSAGE_BODY_DATA_INVALID:
+                qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%i] Invalid body data for streaming message", conn->conn_id); break;
+            case QD_MESSAGE_BODY_DATA_NOT_DATA:
+                qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%i] Invalid body; expected data section", conn->conn_id); break;
+            default:
+                break;
+            }
+            qd_message_set_send_complete(msg);
+            return -1;
+        }
+    }
+    return used;
+}
+
+static void handle_outgoing(qdr_tcp_connection_t *conn)
+{
+    if (conn->outstream) {
+        qd_message_t *msg = qdr_delivery_message(conn->outstream);
+        pn_raw_buffer_t buffs[WRITE_BUFFERS];
+        for (int i = 0; i < WRITE_BUFFERS; i++) {
+            buffs[i].context = 0;
+            buffs[i].bytes = 0;
+            buffs[i].capacity = 0;
+            buffs[i].size = 0;
+            buffs[i].offset = 0;
+        }
+        int n = read_message_body(conn, msg, buffs, WRITE_BUFFERS);
+        if (n > 0) {
+            size_t used = pn_raw_connection_write_buffers(conn->socket, buffs, n);
+            int bytes_written = 0;
+            for (size_t i = 0; i < used; i++) {
+                if (buffs[i].bytes) {
+                    bytes_written += buffs[i].size;
+                } else {
+                    qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%i] empty buffer can't be written (%i of %i)", conn->conn_id, i+1, used);
+                }
+            }
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i] Writing %i bytes", conn->conn_id, bytes_written);
+        }
+        if (qd_message_receive_complete(msg) || qd_message_send_complete(msg)) {
+            pn_raw_connection_close(conn->socket);
+        }
+    }
+}
+
+static char *get_address_string(pn_raw_connection_t *socket)
+{
+    const pn_netaddr_t *netaddr = pn_raw_connection_remote_addr(socket);
+    char buffer[1024];
+    int len = pn_netaddr_str(netaddr, buffer, 1024);
+    if (len <= 1024) {
+        return strdup(buffer);
+    } else {
+        return strndup(buffer, 1024);
+    }
+}
+
+static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc)
+{
+    tc->remote_address = get_address_string(tc->socket);
+    qdr_connection_info_t *info = qdr_connection_info(false, //bool             is_encrypted,
+                                                      false, //bool             is_authenticated,
+                                                      true,  //bool             opened,
+                                                      "",   //char            *sasl_mechanisms,
+                                                      QD_INCOMING, //qd_direction_t   dir,
+                                                      tc->remote_address,    //const char      *host,
+                                                      "",    //const char      *ssl_proto,
+                                                      "",    //const char      *ssl_cipher,
+                                                      "",    //const char      *user,
+                                                      "TcpAdaptor",    //const char      *container,
+                                                      pn_data(0),     //pn_data_t       *connection_properties,
+                                                      0,     //int              ssl_ssf,
+                                                      false, //bool             ssl,
+                                                      // set if remote is a qdrouter
+                                                      0);    //const qdr_router_version_t *version)
+
+    tc->conn_id = qd_server_allocate_connection_id(tc->server);
+    qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core,
+                                                   tcp_adaptor->adaptor,
+                                                   true,
+                                                   QDR_ROLE_NORMAL,
+                                                   1,
+                                                   tc->conn_id,
+                                                   0,
+                                                   0,
+                                                   false,
+                                                   false,
+                                                   false,
+                                                   false,
+                                                   250,
+                                                   0,
+                                                   info,
+                                                   0,
+                                                   0);
+    tc->conn = conn;
+    qdr_connection_set_context(conn, tc);
+
+    qdr_terminus_t *dynamic_source = qdr_terminus(0);
+    qdr_terminus_set_dynamic(dynamic_source);
+    qdr_terminus_t *target = qdr_terminus(0);
+    qdr_terminus_set_address(target, tc->config.address);
+
+    tc->outgoing = qdr_link_first_attach(conn,
+                                         QD_OUTGOING,
+                                         dynamic_source,   //qdr_terminus_t   *source,
+                                         qdr_terminus(0),  //qdr_terminus_t   *target,
+                                         "tcp.ingress.out",        //const char       *name,
+                                         0,                //const char       *terminus_addr,
+                                         false,
+                                         NULL,
+                                         &(tc->outgoing_id));
+    qdr_link_set_context(tc->outgoing, tc);
+    tc->incoming = qdr_link_first_attach(conn,
+                                         QD_INCOMING,
+                                         qdr_terminus(0),  //qdr_terminus_t   *source,
+                                         target,           //qdr_terminus_t   *target,
+                                         "tcp.ingress.in",         //const char       *name,
+                                         0,                //const char       *terminus_addr,
+                                         false,
+                                         NULL,
+                                         &(tc->incoming_id));
+    qdr_link_set_context(tc->incoming, tc);
+
+    grant_read_buffers(tc);
+}
+
+static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void *context)
+{
+    qdr_tcp_connection_t *conn = (qdr_tcp_connection_t*) context;
+    qd_log_source_t *log = tcp_adaptor->log_source;
+    switch (pn_event_type(e)) {
+    case PN_RAW_CONNECTION_CONNECTED: {
+        if (conn->ingress) {
+            qdr_tcp_connection_ingress_accept(conn);
+            qd_log(log, QD_LOG_INFO, "[C%i] Accepted from %s", conn->conn_id, conn->remote_address);
+            break;
+        } else {
+            qd_log(log, QD_LOG_INFO, "[C%i] Connected", conn->conn_id);
+            qdr_connection_process(conn->conn);
+            break;
+        }
+    }
+    case PN_RAW_CONNECTION_CLOSED_READ: {
+        qd_log(log, QD_LOG_DEBUG, "[C%i] Closed for reading", conn->conn_id);
+        pn_raw_connection_close(conn->socket);
+        break;
+    }
+    case PN_RAW_CONNECTION_CLOSED_WRITE: {
+        qd_log(log, QD_LOG_DEBUG, "[C%i] Closed for writing", conn->conn_id);
+        pn_raw_connection_close(conn->socket);
+        break;
+    }
+    case PN_RAW_CONNECTION_DISCONNECTED: {
+        qd_log(log, QD_LOG_INFO, "[C%i] Disconnected", conn->conn_id);
+        handle_disconnected(conn);
+        break;
+    }
+    case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
+        qd_log(log, QD_LOG_DEBUG, "[C%i] Need write buffers", conn->conn_id);
+        while (qdr_connection_process(conn->conn)) {}
+        break;
+    }
+    case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
+        qd_log(log, QD_LOG_DEBUG, "[C%i] Need read buffers", conn->conn_id);
+        while (qdr_connection_process(conn->conn)) {}
+        break;
+    }
+    case PN_RAW_CONNECTION_WAKE: {
+        qd_log(log, QD_LOG_DEBUG, "[C%i] Wake-up", conn->conn_id);
+        while (qdr_connection_process(conn->conn)) {}
+        break;
+    }
+    case PN_RAW_CONNECTION_READ: {
+        int read = handle_incoming(conn);
+        qd_log(log, QD_LOG_DEBUG, "[C%i] Read %i bytes", conn->conn_id, read);
+        while (qdr_connection_process(conn->conn)) {}
+        break;
+    }
+    case PN_RAW_CONNECTION_WRITTEN: {
+        pn_raw_buffer_t buffs[WRITE_BUFFERS];
+        size_t pn_raw_connection_take_written_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t *buffers, size_t num);
+        size_t n;
+        size_t written = 0;
+        while ( (n = pn_raw_connection_take_written_buffers(conn->socket, buffs, WRITE_BUFFERS)) ) {
+            for (size_t i = 0; i < n; ++i) {
+                written += buffs[i].size;
+                if (buffs[i].context) {
+                    qd_message_body_data_release((qd_message_body_data_t*) buffs[i].context);
+                }
+            }
+        }
+        qd_log(log, QD_LOG_DEBUG, "[C%i] Wrote %i bytes", conn->conn_id, written);
+        while (qdr_connection_process(conn->conn)) {}
+        break;
+    }
+    default:
+        break;
+    }
+}
+
+static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* listener)
+{
+    qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t);
+    ZERO(tc);
+    tc->ingress = true;
+    tc->context.context = tc;
+    tc->context.handler = &handle_connection_event;
+    tc->config = listener->config;
+    tc->server = listener->server;
+    tc->socket = pn_raw_connection();
+    pn_raw_connection_set_context(tc->socket, tc);
+    //the following call will cause a PN_RAW_CONNECTION_CONNECTED
+    //event on another thread, which is where the rest of the
+    //initialisation will happen, through a call to
+    //qdr_tcp_connection_ingress_accept
+    pn_listener_raw_accept(listener->pn_listener, tc->socket);
+    return tc;
+}
+
+static void tcp_connector_establish(qdr_tcp_connection_t *conn)
+{
+    qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%i] Connecting to: %s", conn->conn_id, conn->config.host_port);
+    conn->socket = pn_raw_connection();
+    pn_raw_connection_set_context(conn->socket, conn);
+    pn_proactor_raw_connect(qd_server_proactor(conn->server), conn->socket, conn->config.host_port);
+}
+
+static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *config, qd_server_t *server, qdr_delivery_t *initial_delivery)
+{
+    qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t);
+    ZERO(tc);
+    if (initial_delivery) {
+        tc->egress_dispatcher = false;
+    } else {
+        tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc);
+        tc->egress_dispatcher = true;
+    }
+    tc->ingress = false;
+    tc->context.context = tc;
+    tc->context.handler = &handle_connection_event;
+    tc->config = *config;
+    tc->server = server;
+    qdr_connection_info_t *info = qdr_connection_info(false, //bool             is_encrypted,
+                                                      false, //bool             is_authenticated,
+                                                      true,  //bool             opened,
+                                                      "",   //char            *sasl_mechanisms,
+                                                      QD_OUTGOING, //qd_direction_t   dir,
+                                                      tc->egress_dispatcher ? "egress-dispatch" : tc->config.host_port,    //const char      *host,
+                                                      "",    //const char      *ssl_proto,
+                                                      "",    //const char      *ssl_cipher,
+                                                      "",    //const char      *user,
+                                                      "TcpAdaptor",    //const char      *container,
+                                                      pn_data(0),     //pn_data_t       *connection_properties,
+                                                      0,     //int              ssl_ssf,
+                                                      false, //bool             ssl,
+                                                      // set if remote is a qdrouter
+                                                      0);    //const qdr_router_version_t *version)
+
+    tc->conn_id = qd_server_allocate_connection_id(tc->server);
+    qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core,
+                                                   tcp_adaptor->adaptor,
+                                                   false,
+                                                   QDR_ROLE_NORMAL,
+                                                   1,
+                                                   tc->conn_id,
+                                                   0,
+                                                   0,
+                                                   false,
+                                                   false,
+                                                   false,
+                                                   false,
+                                                   250,
+                                                   0,
+                                                   info,
+                                                   0,
+                                                   0);
+    tc->conn = conn;
+    qdr_connection_set_context(conn, tc);
+
+    qdr_terminus_t *source = qdr_terminus(0);
+    qdr_terminus_set_address(source, tc->config.address);
+
+    tc->outgoing = qdr_link_first_attach(conn,
+                          QD_OUTGOING,
+                          source,           //qdr_terminus_t   *source,
+                          qdr_terminus(0),  //qdr_terminus_t   *target,
+                          "tcp.egress.out", //const char       *name,
+                          0,                //const char       *terminus_addr,
+                          !(tc->egress_dispatcher),
+                          initial_delivery,
+                          &(tc->outgoing_id));
+    qdr_link_set_context(tc->outgoing, tc);
+    //the incoming link for egress is created once we receive the
+    //message which has the reply to address (and read buffers are
+    //granted at that point)
+    if (!tc->egress_dispatcher) {
+        tcp_connector_establish(tc);
+    }
+
+    return tc;
+}
+
+static void free_bridge_config(qd_bridge_config_t *config)
+{
+    if (!config) return;
+    free(config->host);
+    free(config->port);
+    free(config->name);
+    free(config->site_id);
+    free(config->host_port);
+}
+
+#define CHECK() if (qd_error_code()) goto error
+
+static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_bridge_config_t *config, qd_entity_t* entity, bool is_listener)
+{
+    qd_error_clear();
+    ZERO(config);
+
+    config->name                 = qd_entity_get_string(entity, "name");              CHECK();
+    config->address              = qd_entity_get_string(entity, "address");           CHECK();
+    config->host                 = qd_entity_get_string(entity, "host");              CHECK();
+    config->port                 = qd_entity_get_string(entity, "port");              CHECK();
+    config->site_id              = qd_entity_opt_string(entity, "site-id", 0);        CHECK();
+
+    int hplen = strlen(config->host) + strlen(config->port) + 2;
+    config->host_port = malloc(hplen);
+    snprintf(config->host_port, hplen, "%s:%s", config->host, config->port);
+
+    return QD_ERROR_NONE;
+
+ error:
+    free_bridge_config(config);
+    return qd_error_code();
+}
+
+static void log_tcp_bridge_config(qd_log_source_t *log, qd_bridge_config_t *c, const char *what) {
+    qd_log(log, QD_LOG_INFO, "Configured %s for %s, %s:%s", what, c->address, c->host, c->port);
+}
+
+void qd_tcp_listener_decref(qd_tcp_listener_t* li)
+{
+    if (li && sys_atomic_dec(&li->ref_count) == 1) {
+        free_bridge_config(&li->config);
+        free_qd_tcp_listener_t(li);
+    }
+}
+
+static void handle_listener_event(pn_event_t *e, qd_server_t *qd_server, void *context) {
+    qd_log_source_t *log = tcp_adaptor->log_source;
+
+    qd_tcp_listener_t *li = (qd_tcp_listener_t*) context;
+    const char *host_port = li->config.host_port;
+
+    switch (pn_event_type(e)) {
+
+    case PN_LISTENER_OPEN: {
+        qd_log(log, QD_LOG_NOTICE, "Listening on %s", host_port);
+        break;
+    }
+
+    case PN_LISTENER_ACCEPT: {
+        qd_log(log, QD_LOG_INFO, "Accepting TCP connection on %s", host_port);
+        qdr_tcp_connection_ingress(li);
+        break;
+    }
+
+    case PN_LISTENER_CLOSE:
+        if (li->pn_listener) {
+            pn_condition_t *cond = pn_listener_condition(li->pn_listener);
+            if (pn_condition_is_set(cond)) {
+                qd_log(log, QD_LOG_ERROR, "Listener error on %s: %s (%s)", host_port,
+                       pn_condition_get_description(cond),
+                       pn_condition_get_name(cond));
+            } else {
+                qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port);
+            }
+            pn_listener_set_context(li->pn_listener, 0);
+            li->pn_listener = 0;
+            qd_tcp_listener_decref(li);
+        }
+        break;
+
+    default:
+        break;
+    }
+}
+
+static qd_tcp_listener_t *qd_tcp_listener(qd_server_t *server)
+{
+    qd_tcp_listener_t *li = new_qd_tcp_listener_t();
+    if (!li) return 0;
+    ZERO(li);
+    sys_atomic_init(&li->ref_count, 1);
+    li->server = server;
+    li->context.context = li;
+    li->context.handler = &handle_listener_event;
+    return li;
+}
+
+static const int BACKLOG = 50;  /* Listening backlog */
+
+static bool tcp_listener_listen(qd_tcp_listener_t *li) {
+   li->pn_listener = pn_listener();
+    if (li->pn_listener) {
+        pn_listener_set_context(li->pn_listener, &li->context);
+        pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, li->config.host_port, BACKLOG);
+        sys_atomic_inc(&li->ref_count); /* In use by proactor, PN_LISTENER_CLOSE will dec */
+        /* Listen is asynchronous, log "listening" message on PN_LISTENER_OPEN event */
+    } else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_CRITICAL, "Failed to create listener for %s",
+               li->config.host_port);
+     }
+    return li->pn_listener;
+}
+
+qd_tcp_listener_t *qd_dispatch_configure_tcp_listener(qd_dispatch_t *qd, qd_entity_t *entity)
+{
+    qd_tcp_listener_t *li = qd_tcp_listener(qd->server);
+    if (!li || load_bridge_config(qd, &li->config, entity, true) != QD_ERROR_NONE) {
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "Unable to create tcp listener: %s", qd_error_message());
+        qd_tcp_listener_decref(li);
+        return 0;
+    }
+    DEQ_ITEM_INIT(li);
+    DEQ_INSERT_TAIL(tcp_adaptor->listeners, li);
+    log_tcp_bridge_config(tcp_adaptor->log_source, &li->config, "TcpListener");
+    tcp_listener_listen(li);
+    return li;
+}
+
+void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl)
+{
+    qd_tcp_listener_t *li = (qd_tcp_listener_t*) impl;
+    if (li) {
+        if (li->pn_listener) {
+            pn_listener_close(li->pn_listener);
+        }
+        DEQ_REMOVE(tcp_adaptor->listeners, li);
+        qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Deleted TcpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port);
+        qd_tcp_listener_decref(li);
+    }
+}
+
+qd_error_t qd_entity_refresh_tcpListener(qd_entity_t* entity, void *impl)
+{
+    return QD_ERROR_NONE;
+}
+
+static qd_tcp_connector_t *qd_tcp_connector(qd_server_t *server)
+{
+    qd_tcp_connector_t *c = new_qd_tcp_connector_t();
+    if (!c) return 0;
+    ZERO(c);
+    sys_atomic_init(&c->ref_count, 1);
+    c->server      = server;
+    return c;
+}
+
+void qd_tcp_connector_decref(qd_tcp_connector_t* c)
+{
+    if (c && sys_atomic_dec(&c->ref_count) == 1) {
+        free_bridge_config(&c->config);
+        free_qd_tcp_connector_t(c);
+    }
+}
+
+qd_tcp_connector_t *qd_dispatch_configure_tcp_connector(qd_dispatch_t *qd, qd_entity_t *entity)
+{
+    qd_tcp_connector_t *c = qd_tcp_connector(qd->server);
+    if (!c || load_bridge_config(qd, &c->config, entity, true) != QD_ERROR_NONE) {
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "Unable to create tcp connector: %s", qd_error_message());
+        qd_tcp_connector_decref(c);
+        return 0;
+    }
+    DEQ_ITEM_INIT(c);
+    DEQ_INSERT_TAIL(tcp_adaptor->connectors, c);
+    log_tcp_bridge_config(tcp_adaptor->log_source, &c->config, "TcpConnector");
+    c->dispatcher = qdr_tcp_connection_egress(&(c->config), c->server, NULL);
+    return c;
+}
+
+static void close_egress_dispatcher(qdr_tcp_connection_t *context)
+{
+    //actual close needs to happen on connection thread
+    context->connector_closed = true;
+    qd_timer_schedule(context->activate_timer, 0);
+}
+
+void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl)
+{
+    qd_tcp_connector_t *ct = (qd_tcp_connector_t*) impl;
+    if (ct) {
+        //need to close the pseudo-connection used for dispatching
+        //deliveries out to live connnections:
+        qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Deleted TcpConnector for %s, %s:%s", ct->config.address, ct->config.host, ct->config.port);
+        close_egress_dispatcher((qdr_tcp_connection_t*) ct->dispatcher);
+        DEQ_REMOVE(tcp_adaptor->connectors, ct);
+        qd_tcp_connector_decref(ct);
+    }
+}
+
+qd_error_t qd_entity_refresh_tcpConnector(qd_entity_t* entity, void *impl)
+{
+    return QD_ERROR_NONE;
+}
+
+static void qdr_tcp_first_attach(void *context, qdr_connection_t *conn, qdr_link_t *link,
+                                 qdr_terminus_t *source, qdr_terminus_t *target,
+                                 qd_session_class_t session_class)
+{
+}
+
+static void qdr_tcp_connection_copy_reply_to(qdr_tcp_connection_t* tc, qd_iterator_t* reply_to)
+{
+    int length = qd_iterator_length(reply_to);
+    tc->reply_to = malloc(length + 1);
+    qd_iterator_strncpy(reply_to, tc->reply_to, length + 1);
+}
+
+static void qdr_tcp_second_attach(void *context, qdr_link_t *link,
+                                  qdr_terminus_t *source, qdr_terminus_t *target)
+{
+    void* link_context = qdr_link_get_context(link);
+    if (link_context) {
+        qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
+        if (qdr_link_direction(link) == QD_OUTGOING) {
+            if (tc->ingress) {
+                qdr_tcp_connection_copy_reply_to(tc, qdr_terminus_get_address(source));
+                // for ingress, can start reading from socket once we have
+                // a reply to address, as that is when we are able to send
+                // out a message
+                grant_read_buffers(tc);
+                handle_incoming(tc);
+            }
+            qdr_link_flow(tcp_adaptor->core, link, 10, false);
+        } else if (!tc->ingress) {
+            //for egress we can start reading from the socket once we
+            //have the link to send messages over
+            grant_read_buffers(tc);
+        }
+    }
+}
+
+
+static void qdr_tcp_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
+{
+}
+
+
+static void qdr_tcp_flow(void *context, qdr_link_t *link, int credit)
+{
+}
+
+
+static void qdr_tcp_offer(void *context, qdr_link_t *link, int delivery_count)
+{
+}
+
+
+static void qdr_tcp_drained(void *context, qdr_link_t *link)
+{
+}
+
+
+static void qdr_tcp_drain(void *context, qdr_link_t *link, bool mode)
+{
+}
+
+
+static int qdr_tcp_push(void *context, qdr_link_t *link, int limit)
+{
+    return qdr_link_process_deliveries(tcp_adaptor->core, link, limit);
+}
+
+
+static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled)
+{
+    void* link_context = qdr_link_get_context(link);
+    if (link_context) {
+            qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i][L%i] Delivery event", tc->conn_id, tc->outgoing_id);
+            if (tc->egress_dispatcher) {
+                qdr_tcp_connection_egress(&(tc->config), tc->server, delivery);
+            } else if (!tc->outstream) {
+                tc->outstream = delivery;
+                if (!tc->ingress) {
+                    //on egress, can only set up link for the reverse
+                    //direction once we receive the first part of the
+                    //message from client to server
+                    qd_message_t *msg = qdr_delivery_message(delivery);
+                    qdr_tcp_connection_copy_reply_to(tc, qd_message_field_iterator(msg, QD_FIELD_REPLY_TO));
+                    qdr_terminus_t *target = qdr_terminus(0);
+                    qdr_terminus_set_address(target, tc->reply_to);
+                    tc->incoming = qdr_link_first_attach(tc->conn,
+                                                         QD_INCOMING,
+                                                         qdr_terminus(0),  //qdr_terminus_t   *source,
+                                                         target, //qdr_terminus_t   *target,
+                                                         "tcp.egress.in",  //const char       *name,
+                                                         0,                //const char       *terminus_addr,
+                                                         false,
+                                                         NULL,
+                                                         &(tc->incoming_id));
+                    qdr_link_set_context(tc->incoming, tc);
+                    handle_incoming(tc);
+                }
+            }
+            handle_outgoing(tc);
+    }
+    return 0;
+}
+
+
+static int qdr_tcp_get_credit(void *context, qdr_link_t *link)
+{
+    return 10;
+}
+
+
+static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
+{
+    void* link_context = qdr_link_get_context(qdr_delivery_link(dlv));
+    if (link_context) {
+        qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i] Delivery update", tc->conn_id);
+    }
+}
+
+
+static void qdr_tcp_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
+{
+}
+
+
+static void qdr_tcp_conn_trace(void *context, qdr_connection_t *conn, bool trace)
+{
+}
+
+static void qdr_tcp_activate(void *notused, qdr_connection_t *c)
+{
+    void *context = qdr_connection_get_context(c);
+    if (context) {
+        qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
+        if (conn->socket) {
+            pn_raw_connection_wake(conn->socket);
+        } else if (conn->activate_timer) {
+            // On egress, the raw connection is only created once the
+            // first part of the message encapsulating the
+            // client->server half of the stream has been
+            // received. Prior to that however a subscribing link (and
+            // its associated connection must be setup), for which we
+            // fake wakeup by using a timer.
+            qd_timer_schedule(conn->activate_timer, 0);
+        } else {
+            qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%i] Cannot activate", conn->conn_id);
+        }
+    }
+}
+
+/**
+ * This initialization function will be invoked when the router core is ready for the protocol
+ * adaptor to be created.  This function must:
+ *
+ *   1) Register the protocol adaptor with the router-core.
+ *   2) Prepare the protocol adaptor to be configured.
+ */
+static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context)
+{
+    qdr_tcp_adaptor_t *adaptor = NEW(qdr_tcp_adaptor_t);
+    adaptor->core    = core;
+    adaptor->adaptor = qdr_protocol_adaptor(core,
+                                            "tcp",                // name
+                                            adaptor,              // context
+                                            qdr_tcp_activate,                    // activate
+                                            qdr_tcp_first_attach,
+                                            qdr_tcp_second_attach,
+                                            qdr_tcp_detach,
+                                            qdr_tcp_flow,
+                                            qdr_tcp_offer,
+                                            qdr_tcp_drained,
+                                            qdr_tcp_drain,
+                                            qdr_tcp_push,
+                                            qdr_tcp_deliver,
+                                            qdr_tcp_get_credit,
+                                            qdr_tcp_delivery_update,
+                                            qdr_tcp_conn_close,
+                                            qdr_tcp_conn_trace);
+    adaptor->log_source  = qd_log_source("TCP_ADAPTOR");
+    DEQ_INIT(adaptor->listeners);
+    DEQ_INIT(adaptor->connectors);
+    *adaptor_context = adaptor;
+
+    tcp_adaptor = adaptor;
+}
+
+
+static void qdr_tcp_adaptor_final(void *adaptor_context)
+{
+    qdr_tcp_adaptor_t *adaptor = (qdr_tcp_adaptor_t*) adaptor_context;
+    qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
+    free(adaptor);
+    tcp_adaptor =  NULL;
+}
+
+/**
+ * Declare the adaptor so that it will self-register on process startup.
+ */
+QDR_CORE_ADAPTOR_DECLARE("tcp-adaptor", qdr_tcp_adaptor_init, qdr_tcp_adaptor_final)
diff --git a/src/adaptors/tcp_adaptor.h b/src/adaptors/tcp_adaptor.h
new file mode 100644
index 0000000..17bd7f9
--- /dev/null
+++ b/src/adaptors/tcp_adaptor.h
@@ -0,0 +1,79 @@
+#ifndef __tcp_adaptor_h__
+#define __tcp_adaptor_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/atomic.h>
+#include <qpid/dispatch/enum.h>
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/log.h>
+#include <proton/engine.h>
+#include <proton/event.h>
+#include <proton/ssl.h>
+
+#include "dispatch_private.h"
+#include "timer_private.h"
+
+typedef struct qd_tcp_listener_t qd_tcp_listener_t;
+typedef struct qd_tcp_connector_t qd_tcp_connector_t;
+typedef struct qd_bridge_config_t qd_bridge_config_t;
+
+struct qd_bridge_config_t
+{
+    char *name;
+    char *address;
+    char *host;
+    char *port;
+    char *site_id;
+    char *host_port;
+};
+
+struct qd_tcp_listener_t
+{
+    qd_handler_context_t      context;
+    /* May be referenced by connection_manager and pn_listener_t */
+    sys_atomic_t              ref_count;
+    qd_server_t              *server;
+    qd_bridge_config_t        config;
+    pn_listener_t            *pn_listener;
+
+    DEQ_LINKS(qd_tcp_listener_t);
+};
+
+DEQ_DECLARE(qd_tcp_listener_t, qd_tcp_listener_list_t);
+ALLOC_DECLARE(qd_tcp_listener_t);
+
+struct qd_tcp_connector_t
+{
+    /* May be referenced by connection_manager, timer and pn_connection_t */
+    sys_atomic_t              ref_count;
+    qd_server_t              *server;
+    qd_bridge_config_t        config;
+    void                     *dispatcher;
+
+    DEQ_LINKS(qd_tcp_connector_t);
+};
+
+DEQ_DECLARE(qd_tcp_connector_t, qd_tcp_connector_list_t);
+ALLOC_DECLARE(qd_tcp_connector_t);
+
+#endif


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 03/03: DISPATCH-1742: free protocol adaptor before core

Posted by gs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit cb7d9cd73b7b8f21e36837fec9e628b38d825416
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Wed Aug 12 18:14:03 2020 +0100

    DISPATCH-1742: free protocol adaptor before core
---
 src/router_node.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/router_node.c b/src/router_node.c
index 993a235..aa69f29 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -2001,8 +2001,8 @@ void qd_router_free(qd_router_t *router)
 
     qd_container_set_default_node_type(router->qd, 0, 0, QD_DIST_BOTH);
 
-    qdr_core_free(router->router_core);
     qdr_protocol_adaptor_free(router->router_core, amqp_direct_adaptor);
+    qdr_core_free(router->router_core);
     qd_tracemask_free(router->tracemask);
     qd_timer_free(router->timer);
     sys_mutex_free(router->lock);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org