You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2020/08/12 17:16:24 UTC
[qpid-dispatch] 01/03: DISPATCH-1654: Initial TCP adaptor
This is an automated email from the ASF dual-hosted git repository.
gsim pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit c0fb276ecc3a193e9f03790cd3b4f910677cbe3a
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Tue Aug 11 11:32:33 2020 +0100
DISPATCH-1654: Initial TCP adaptor
---
python/qpid_dispatch/management/qdrouter.json | 61 ++
python/qpid_dispatch_internal/dispatch.py | 2 +
python/qpid_dispatch_internal/management/agent.py | 28 +
python/qpid_dispatch_internal/management/config.py | 2 +-
src/CMakeLists.txt | 1 +
src/adaptors/tcp_adaptor.c | 917 +++++++++++++++++++++
src/adaptors/tcp_adaptor.h | 79 ++
7 files changed, 1089 insertions(+), 1 deletion(-)
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 421fa0a..0bb2bec 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1151,6 +1151,66 @@
}
},
+ "tcpListener": {
+ "description": "Ingress TCP bridge.",
+ "extends": "configurationEntity",
+ "operations": ["CREATE", "DELETE"],
+ "attributes": {
+ "address": {
+ "description":"Address of this bridge",
+ "type": "string",
+ "create": true
+ },
+ "host": {
+ "description":"A host name, IPV4 or IPV6 literal, or the empty string. The empty string listens on all local addresses. A host name listens on all addresses associated with the name. An IPV6 literal address (or wildcard '[::]') listens only for IPV6. An IPV4 literal address (or wildcard '0.0.0.0') listens only for IPV4.",
+ "type": "string",
+ "default": "0.0.0.0",
+ "create": true
+ },
+ "port": {
+ "description": "Port number or symbolic service name. If '0', the router shall assign an ephemeral port to the listener and log the port number with a log of the form 'SERVER (notice) Listening on <host>:<assigned-port> (<listener-name>)'",
+ "type": "string",
+ "create": true
+ },
+ "siteId": {
+ "type": "string",
+ "required": false,
+ "description": "Used to identify where connection is handled.",
+ "create": true
+ }
+ }
+ },
+
+ "tcpConnector": {
+ "description": "Egress TCP bridge.",
+ "extends": "configurationEntity",
+ "operations": ["CREATE", "DELETE"],
+ "attributes": {
+ "address": {
+ "description":"Address of this bridge",
+ "type": "string",
+ "create": true
+ },
+ "host": {
+ "description":"IP address: ipv4 or ipv6 literal or a host name",
+ "type": "string",
+ "create": true
+ },
+ "port": {
+ "description": "Port number or symbolic service name.",
+ "type": "string",
+ "create": true
+
+ },
+ "siteId": {
+ "type": "string",
+ "required": false,
+ "description": "Used to identify origin of connections.",
+ "create": true
+ }
+ }
+ },
+
"log": {
"description": "Configure logging for a particular module. You can use the `UPDATE` operation to change log settings while the router is running.",
"extends": "configurationEntity",
@@ -1174,6 +1234,7 @@
"CONN_MGR",
"PYTHON",
"PROTOCOL",
+ "TCP_ADAPTOR",
"HTTP_ADAPTOR",
"DEFAULT"
],
diff --git a/python/qpid_dispatch_internal/dispatch.py b/python/qpid_dispatch_internal/dispatch.py
index ad99749..f5bd91a 100644
--- a/python/qpid_dispatch_internal/dispatch.py
+++ b/python/qpid_dispatch_internal/dispatch.py
@@ -67,6 +67,8 @@ class QdDll(ctypes.PyDLL):
self._prototype(self.qd_dispatch_configure_connector, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_ssl_profile, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_sasl_plugin, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
+ self._prototype(self.qd_dispatch_configure_tcp_listener, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
+ self._prototype(self.qd_dispatch_configure_tcp_connector, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_http_lsnr, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_http_connector, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_connection_manager_delete_listener, None, [self.qd_dispatch_p, ctypes.c_void_p])
diff --git a/python/qpid_dispatch_internal/management/agent.py b/python/qpid_dispatch_internal/management/agent.py
index af6d8a2..ec5696c 100644
--- a/python/qpid_dispatch_internal/management/agent.py
+++ b/python/qpid_dispatch_internal/management/agent.py
@@ -586,6 +586,34 @@ class HttpListenerEntity(EntityAdapter):
def _delete(self):
self._qd.qd_dispatch_delete_http_listener(self._dispatch, self._implementations[0].key)
+class TcpListenerEntity(EntityAdapter):
+ def create(self):
+ config_listener = self._qd.qd_dispatch_configure_tcp_listener(self._dispatch, self)
+ return config_listener
+
+ def _identifier(self):
+ return _host_port_name_identifier(self)
+
+ def __str__(self):
+ return super(TcpListenerEntity, self).__str__().replace("Entity(", "TcpListenerEntity(")
+
+ def _delete(self):
+ self._qd.qd_dispatch_delete_tcp_listener(self._dispatch, self._implementations[0].key)
+
+
+class TcpConnectorEntity(EntityAdapter):
+ def create(self):
+ config_connector = self._qd.qd_dispatch_configure_tcp_connector(self._dispatch, self)
+ return config_connector
+
+ def _identifier(self):
+ return _host_port_name_identifier(self)
+
+ def __str__(self):
+ return super(TcpConnectorEntity, self).__str__().replace("Entity(", "TcpConnectorEntity(")
+
+ def _delete(self):
+ self._qd.qd_dispatch_delete_tcp_connector(self._dispatch, self._implementations[0].key)
class HttpConnectorEntity(EntityAdapter):
def create(self):
diff --git a/python/qpid_dispatch_internal/management/config.py b/python/qpid_dispatch_internal/management/config.py
index 0edfebb..21016f7 100644
--- a/python/qpid_dispatch_internal/management/config.py
+++ b/python/qpid_dispatch_internal/management/config.py
@@ -313,7 +313,7 @@ def configure_dispatch(dispatch, lib_handle, filename):
for t in "sslProfile", "authServicePlugin", "listener", "connector", \
"router.config.address", "router.config.linkRoute", "router.config.autoLink", \
"router.config.exchange", "router.config.binding", \
- "vhost", "httpListener", "httpConnector":
+ "vhost", "httpListener", "httpConnector", "tcpListener", "tcpConnector":
for a in config.by_type(t):
configure(a)
if t == "sslProfile":
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index f7ac67e..b780011 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -43,6 +43,7 @@ set(qpid_dispatch_SOURCES
adaptors/http_adaptor.c
adaptors/http1/http1_lib.c
adaptors/http1/http1_adaptor.c
+ adaptors/tcp_adaptor.c
alloc_pool.c
amqp.c
bitmask.c
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
new file mode 100644
index 0000000..27dcb5c
--- /dev/null
+++ b/src/adaptors/tcp_adaptor.c
@@ -0,0 +1,917 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/condition.h>
+#include <proton/listener.h>
+#include <proton/netaddr.h>
+#include <proton/proactor.h>
+#include <proton/raw_connection.h>
+#include "qpid/dispatch/ctools.h"
+#include "qpid/dispatch/protocol_adaptor.h"
+#include "delivery.h"
+#include "tcp_adaptor.h"
+#include <stdio.h>
+#include <inttypes.h>
+
+ALLOC_DEFINE(qd_tcp_listener_t);
+ALLOC_DEFINE(qd_tcp_connector_t);
+
+typedef struct qdr_tcp_adaptor_t {
+ qdr_core_t *core;
+ qdr_protocol_adaptor_t *adaptor;
+ qd_tcp_listener_list_t listeners;
+ qd_tcp_connector_list_t connectors;
+ qd_log_source_t *log_source;
+} qdr_tcp_adaptor_t;
+
+static qdr_tcp_adaptor_t *tcp_adaptor;
+
+#define READ_BUFFERS 4
+#define WRITE_BUFFERS 4
+
+typedef struct qdr_tcp_connection_t {
+ qd_handler_context_t context;
+ char *reply_to;
+ qdr_connection_t *conn;
+ uint64_t conn_id;
+ qdr_link_t *incoming;
+ uint64_t incoming_id;
+ qdr_link_t *outgoing;
+ uint64_t outgoing_id;
+ pn_raw_connection_t *socket;
+ qdr_delivery_t *instream;
+ qdr_delivery_t *outstream;
+ bool ingress;
+ bool egress_dispatcher;
+ bool connector_closed;//only used if egress_dispatcher=true
+ qd_timer_t *activate_timer;
+ qd_bridge_config_t config;
+ qd_server_t *server;
+ char *remote_address;
+} qdr_tcp_connection_t;
+
+static void handle_disconnected(qdr_tcp_connection_t* conn);
+static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn);
+
+static void on_activate(void *context)
+{
+ qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
+
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i] on_activate", conn->conn_id);
+ while (qdr_connection_process(conn->conn)) {}
+ if (conn->egress_dispatcher && conn->connector_closed) {
+ qdr_connection_closed(conn->conn);
+ qdr_connection_set_context(conn->conn, 0);
+ free_qdr_tcp_connection(conn);
+ }
+}
+
+static void grant_read_buffers(qdr_tcp_connection_t *conn)
+{
+ pn_raw_buffer_t raw_buffers[READ_BUFFERS];
+ // Give proactor more read buffers for the socket
+ if (!pn_raw_connection_is_read_closed(conn->socket)) {
+ size_t desired = pn_raw_connection_read_buffers_capacity(conn->socket);
+ while (desired) {
+ size_t i;
+ for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
+ qd_buffer_t *buf = qd_buffer();
+ raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
+ raw_buffers[i].capacity = qd_buffer_capacity(buf);
+ raw_buffers[i].size = 0;
+ raw_buffers[i].offset = 0;
+ raw_buffers[i].context = (uintptr_t) buf;
+ }
+ desired -= i;
+ pn_raw_connection_give_read_buffers(conn->socket, raw_buffers, i);
+ }
+ }
+}
+
+static int handle_incoming(qdr_tcp_connection_t *conn)
+{
+ qd_buffer_list_t buffers;
+ DEQ_INIT(buffers);
+ pn_raw_buffer_t raw_buffers[READ_BUFFERS];
+ size_t n;
+ int count = 0;
+ while ( (n = pn_raw_connection_take_read_buffers(conn->socket, raw_buffers, READ_BUFFERS)) ) {
+ for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) {
+ qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
+ qd_buffer_insert(buf, raw_buffers[i].size);
+ count += raw_buffers[i].size;
+ DEQ_INSERT_TAIL(buffers, buf);
+ }
+ }
+
+ grant_read_buffers(conn);
+
+ if (conn->instream) {
+ qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+ qd_compose_insert_binary_buffers(field, &buffers);
+ qd_message_extend(qdr_delivery_message(conn->instream), field);
+ qd_compose_free(field);
+ qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i][L%i] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count);
+ } else {
+ qd_message_t *msg = qd_message();
+
+ qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
+ qd_compose_start_list(props);
+ qd_compose_insert_null(props); // message-id
+ qd_compose_insert_null(props); // user-id
+ qd_compose_insert_null(props); // to
+ qd_compose_insert_null(props); // subject
+ qd_compose_insert_string(props, conn->reply_to); // reply-to
+ //qd_compose_insert_null(props); // correlation-id
+ //qd_compose_insert_null(props); // content-type
+ //qd_compose_insert_null(props); // content-encoding
+ //qd_compose_insert_timestamp(props, 0); // absolute-expiry-time
+ //qd_compose_insert_timestamp(props, 0); // creation-time
+ //qd_compose_insert_null(props); // group-id
+ //qd_compose_insert_uint(props, 0); // group-sequence
+ //qd_compose_insert_null(props); // reply-to-group-id
+ qd_compose_end_list(props);
+
+ if (count > 0) {
+ props = qd_compose(QD_PERFORMATIVE_BODY_DATA, props);
+ qd_compose_insert_binary_buffers(props, &buffers);
+ }
+
+ qd_message_compose_2(msg, props, false);
+ qd_compose_free(props);
+
+ conn->instream = qdr_link_deliver(conn->incoming, msg, 0, false, 0, 0, 0, 0);
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i][L%i] Initiating message with %i bytes", conn->conn_id, conn->incoming_id, count);
+ }
+ return count;
+}
+
+static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc)
+{
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "Freeing %p", (void*) tc);
+ if (tc->reply_to) {
+ free(tc->reply_to);
+ }
+ if(tc->remote_address) {
+ free(tc->remote_address);
+ }
+ if (tc->activate_timer) {
+ qd_timer_free(tc->activate_timer);
+ }
+ //proactor will free the socket
+ free(tc);
+}
+
+static void handle_disconnected(qdr_tcp_connection_t* conn)
+{
+ if (conn->instream) {
+ qd_message_set_receive_complete(qdr_delivery_message(conn->instream));
+ qdr_delivery_continue(tcp_adaptor->core, conn->instream, true);
+ }
+ qdr_connection_closed(conn->conn);
+ qdr_connection_set_context(conn->conn, 0);
+ free_qdr_tcp_connection(conn);
+}
+
+static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_raw_buffer_t *buffers, int count)
+{
+ int used = 0;
+ qd_message_body_data_t *body_data;
+ while (used < count) {
+ qd_message_body_data_result_t body_data_result = qd_message_next_body_data(msg, &body_data);
+ if (body_data_result == QD_MESSAGE_BODY_DATA_OK) {
+ used += qd_message_body_data_buffers(body_data, buffers + used, used, count - used);
+ if (used > 0) {
+ buffers[used-1].context = (uintptr_t) body_data;
+ }
+ } else if (body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) {
+ return used;
+ } else {
+ switch (body_data_result) {
+ case QD_MESSAGE_BODY_DATA_NO_MORE:
+ qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%i] EOS", conn->conn_id); break;
+ case QD_MESSAGE_BODY_DATA_INVALID:
+ qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%i] Invalid body data for streaming message", conn->conn_id); break;
+ case QD_MESSAGE_BODY_DATA_NOT_DATA:
+ qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%i] Invalid body; expected data section", conn->conn_id); break;
+ default:
+ break;
+ }
+ qd_message_set_send_complete(msg);
+ return -1;
+ }
+ }
+ return used;
+}
+
+static void handle_outgoing(qdr_tcp_connection_t *conn)
+{
+ if (conn->outstream) {
+ qd_message_t *msg = qdr_delivery_message(conn->outstream);
+ pn_raw_buffer_t buffs[WRITE_BUFFERS];
+ for (int i = 0; i < WRITE_BUFFERS; i++) {
+ buffs[i].context = 0;
+ buffs[i].bytes = 0;
+ buffs[i].capacity = 0;
+ buffs[i].size = 0;
+ buffs[i].offset = 0;
+ }
+ int n = read_message_body(conn, msg, buffs, WRITE_BUFFERS);
+ if (n > 0) {
+ size_t used = pn_raw_connection_write_buffers(conn->socket, buffs, n);
+ int bytes_written = 0;
+ for (size_t i = 0; i < used; i++) {
+ if (buffs[i].bytes) {
+ bytes_written += buffs[i].size;
+ } else {
+ qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%i] empty buffer can't be written (%i of %i)", conn->conn_id, i+1, used);
+ }
+ }
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i] Writing %i bytes", conn->conn_id, bytes_written);
+ }
+ if (qd_message_receive_complete(msg) || qd_message_send_complete(msg)) {
+ pn_raw_connection_close(conn->socket);
+ }
+ }
+}
+
+static char *get_address_string(pn_raw_connection_t *socket)
+{
+ const pn_netaddr_t *netaddr = pn_raw_connection_remote_addr(socket);
+ char buffer[1024];
+ int len = pn_netaddr_str(netaddr, buffer, 1024);
+ if (len <= 1024) {
+ return strdup(buffer);
+ } else {
+ return strndup(buffer, 1024);
+ }
+}
+
+static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc)
+{
+ tc->remote_address = get_address_string(tc->socket);
+ qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted,
+ false, //bool is_authenticated,
+ true, //bool opened,
+ "", //char *sasl_mechanisms,
+ QD_INCOMING, //qd_direction_t dir,
+ tc->remote_address, //const char *host,
+ "", //const char *ssl_proto,
+ "", //const char *ssl_cipher,
+ "", //const char *user,
+ "TcpAdaptor", //const char *container,
+ pn_data(0), //pn_data_t *connection_properties,
+ 0, //int ssl_ssf,
+ false, //bool ssl,
+ // set if remote is a qdrouter
+ 0); //const qdr_router_version_t *version)
+
+ tc->conn_id = qd_server_allocate_connection_id(tc->server);
+ qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core,
+ tcp_adaptor->adaptor,
+ true,
+ QDR_ROLE_NORMAL,
+ 1,
+ tc->conn_id,
+ 0,
+ 0,
+ false,
+ false,
+ false,
+ false,
+ 250,
+ 0,
+ info,
+ 0,
+ 0);
+ tc->conn = conn;
+ qdr_connection_set_context(conn, tc);
+
+ qdr_terminus_t *dynamic_source = qdr_terminus(0);
+ qdr_terminus_set_dynamic(dynamic_source);
+ qdr_terminus_t *target = qdr_terminus(0);
+ qdr_terminus_set_address(target, tc->config.address);
+
+ tc->outgoing = qdr_link_first_attach(conn,
+ QD_OUTGOING,
+ dynamic_source, //qdr_terminus_t *source,
+ qdr_terminus(0), //qdr_terminus_t *target,
+ "tcp.ingress.out", //const char *name,
+ 0, //const char *terminus_addr,
+ false,
+ NULL,
+ &(tc->outgoing_id));
+ qdr_link_set_context(tc->outgoing, tc);
+ tc->incoming = qdr_link_first_attach(conn,
+ QD_INCOMING,
+ qdr_terminus(0), //qdr_terminus_t *source,
+ target, //qdr_terminus_t *target,
+ "tcp.ingress.in", //const char *name,
+ 0, //const char *terminus_addr,
+ false,
+ NULL,
+ &(tc->incoming_id));
+ qdr_link_set_context(tc->incoming, tc);
+
+ grant_read_buffers(tc);
+}
+
+static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void *context)
+{
+ qdr_tcp_connection_t *conn = (qdr_tcp_connection_t*) context;
+ qd_log_source_t *log = tcp_adaptor->log_source;
+ switch (pn_event_type(e)) {
+ case PN_RAW_CONNECTION_CONNECTED: {
+ if (conn->ingress) {
+ qdr_tcp_connection_ingress_accept(conn);
+ qd_log(log, QD_LOG_INFO, "[C%i] Accepted from %s", conn->conn_id, conn->remote_address);
+ break;
+ } else {
+ qd_log(log, QD_LOG_INFO, "[C%i] Connected", conn->conn_id);
+ qdr_connection_process(conn->conn);
+ break;
+ }
+ }
+ case PN_RAW_CONNECTION_CLOSED_READ: {
+ qd_log(log, QD_LOG_DEBUG, "[C%i] Closed for reading", conn->conn_id);
+ pn_raw_connection_close(conn->socket);
+ break;
+ }
+ case PN_RAW_CONNECTION_CLOSED_WRITE: {
+ qd_log(log, QD_LOG_DEBUG, "[C%i] Closed for writing", conn->conn_id);
+ pn_raw_connection_close(conn->socket);
+ break;
+ }
+ case PN_RAW_CONNECTION_DISCONNECTED: {
+ qd_log(log, QD_LOG_INFO, "[C%i] Disconnected", conn->conn_id);
+ handle_disconnected(conn);
+ break;
+ }
+ case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
+ qd_log(log, QD_LOG_DEBUG, "[C%i] Need write buffers", conn->conn_id);
+ while (qdr_connection_process(conn->conn)) {}
+ break;
+ }
+ case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
+ qd_log(log, QD_LOG_DEBUG, "[C%i] Need read buffers", conn->conn_id);
+ while (qdr_connection_process(conn->conn)) {}
+ break;
+ }
+ case PN_RAW_CONNECTION_WAKE: {
+ qd_log(log, QD_LOG_DEBUG, "[C%i] Wake-up", conn->conn_id);
+ while (qdr_connection_process(conn->conn)) {}
+ break;
+ }
+ case PN_RAW_CONNECTION_READ: {
+ int read = handle_incoming(conn);
+ qd_log(log, QD_LOG_DEBUG, "[C%i] Read %i bytes", conn->conn_id, read);
+ while (qdr_connection_process(conn->conn)) {}
+ break;
+ }
+ case PN_RAW_CONNECTION_WRITTEN: {
+ pn_raw_buffer_t buffs[WRITE_BUFFERS];
+ size_t pn_raw_connection_take_written_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t *buffers, size_t num);
+ size_t n;
+ size_t written = 0;
+ while ( (n = pn_raw_connection_take_written_buffers(conn->socket, buffs, WRITE_BUFFERS)) ) {
+ for (size_t i = 0; i < n; ++i) {
+ written += buffs[i].size;
+ if (buffs[i].context) {
+ qd_message_body_data_release((qd_message_body_data_t*) buffs[i].context);
+ }
+ }
+ }
+ qd_log(log, QD_LOG_DEBUG, "[C%i] Wrote %i bytes", conn->conn_id, written);
+ while (qdr_connection_process(conn->conn)) {}
+ break;
+ }
+ default:
+ break;
+ }
+}
+
+static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* listener)
+{
+ qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t);
+ ZERO(tc);
+ tc->ingress = true;
+ tc->context.context = tc;
+ tc->context.handler = &handle_connection_event;
+ tc->config = listener->config;
+ tc->server = listener->server;
+ tc->socket = pn_raw_connection();
+ pn_raw_connection_set_context(tc->socket, tc);
+ //the following call will cause a PN_RAW_CONNECTION_CONNECTED
+ //event on another thread, which is where the rest of the
+ //initialisation will happen, through a call to
+ //qdr_tcp_connection_ingress_accept
+ pn_listener_raw_accept(listener->pn_listener, tc->socket);
+ return tc;
+}
+
+static void tcp_connector_establish(qdr_tcp_connection_t *conn)
+{
+ qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%i] Connecting to: %s", conn->conn_id, conn->config.host_port);
+ conn->socket = pn_raw_connection();
+ pn_raw_connection_set_context(conn->socket, conn);
+ pn_proactor_raw_connect(qd_server_proactor(conn->server), conn->socket, conn->config.host_port);
+}
+
+static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *config, qd_server_t *server, qdr_delivery_t *initial_delivery)
+{
+ qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t);
+ ZERO(tc);
+ if (initial_delivery) {
+ tc->egress_dispatcher = false;
+ } else {
+ tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc);
+ tc->egress_dispatcher = true;
+ }
+ tc->ingress = false;
+ tc->context.context = tc;
+ tc->context.handler = &handle_connection_event;
+ tc->config = *config;
+ tc->server = server;
+ qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted,
+ false, //bool is_authenticated,
+ true, //bool opened,
+ "", //char *sasl_mechanisms,
+ QD_OUTGOING, //qd_direction_t dir,
+ tc->egress_dispatcher ? "egress-dispatch" : tc->config.host_port, //const char *host,
+ "", //const char *ssl_proto,
+ "", //const char *ssl_cipher,
+ "", //const char *user,
+ "TcpAdaptor", //const char *container,
+ pn_data(0), //pn_data_t *connection_properties,
+ 0, //int ssl_ssf,
+ false, //bool ssl,
+ // set if remote is a qdrouter
+ 0); //const qdr_router_version_t *version)
+
+ tc->conn_id = qd_server_allocate_connection_id(tc->server);
+ qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core,
+ tcp_adaptor->adaptor,
+ false,
+ QDR_ROLE_NORMAL,
+ 1,
+ tc->conn_id,
+ 0,
+ 0,
+ false,
+ false,
+ false,
+ false,
+ 250,
+ 0,
+ info,
+ 0,
+ 0);
+ tc->conn = conn;
+ qdr_connection_set_context(conn, tc);
+
+ qdr_terminus_t *source = qdr_terminus(0);
+ qdr_terminus_set_address(source, tc->config.address);
+
+ tc->outgoing = qdr_link_first_attach(conn,
+ QD_OUTGOING,
+ source, //qdr_terminus_t *source,
+ qdr_terminus(0), //qdr_terminus_t *target,
+ "tcp.egress.out", //const char *name,
+ 0, //const char *terminus_addr,
+ !(tc->egress_dispatcher),
+ initial_delivery,
+ &(tc->outgoing_id));
+ qdr_link_set_context(tc->outgoing, tc);
+ //the incoming link for egress is created once we receive the
+ //message which has the reply to address (and read buffers are
+ //granted at that point)
+ if (!tc->egress_dispatcher) {
+ tcp_connector_establish(tc);
+ }
+
+ return tc;
+}
+
+static void free_bridge_config(qd_bridge_config_t *config)
+{
+ if (!config) return;
+ free(config->host);
+ free(config->port);
+ free(config->name);
+ free(config->site_id);
+ free(config->host_port);
+}
+
+#define CHECK() if (qd_error_code()) goto error
+
+static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_bridge_config_t *config, qd_entity_t* entity, bool is_listener)
+{
+ qd_error_clear();
+ ZERO(config);
+
+ config->name = qd_entity_get_string(entity, "name"); CHECK();
+ config->address = qd_entity_get_string(entity, "address"); CHECK();
+ config->host = qd_entity_get_string(entity, "host"); CHECK();
+ config->port = qd_entity_get_string(entity, "port"); CHECK();
+ config->site_id = qd_entity_opt_string(entity, "site-id", 0); CHECK();
+
+ int hplen = strlen(config->host) + strlen(config->port) + 2;
+ config->host_port = malloc(hplen);
+ snprintf(config->host_port, hplen, "%s:%s", config->host, config->port);
+
+ return QD_ERROR_NONE;
+
+ error:
+ free_bridge_config(config);
+ return qd_error_code();
+}
+
+static void log_tcp_bridge_config(qd_log_source_t *log, qd_bridge_config_t *c, const char *what) {
+ qd_log(log, QD_LOG_INFO, "Configured %s for %s, %s:%s", what, c->address, c->host, c->port);
+}
+
+void qd_tcp_listener_decref(qd_tcp_listener_t* li)
+{
+ if (li && sys_atomic_dec(&li->ref_count) == 1) {
+ free_bridge_config(&li->config);
+ free_qd_tcp_listener_t(li);
+ }
+}
+
+static void handle_listener_event(pn_event_t *e, qd_server_t *qd_server, void *context) {
+ qd_log_source_t *log = tcp_adaptor->log_source;
+
+ qd_tcp_listener_t *li = (qd_tcp_listener_t*) context;
+ const char *host_port = li->config.host_port;
+
+ switch (pn_event_type(e)) {
+
+ case PN_LISTENER_OPEN: {
+ qd_log(log, QD_LOG_NOTICE, "Listening on %s", host_port);
+ break;
+ }
+
+ case PN_LISTENER_ACCEPT: {
+ qd_log(log, QD_LOG_INFO, "Accepting TCP connection on %s", host_port);
+ qdr_tcp_connection_ingress(li);
+ break;
+ }
+
+ case PN_LISTENER_CLOSE:
+ if (li->pn_listener) {
+ pn_condition_t *cond = pn_listener_condition(li->pn_listener);
+ if (pn_condition_is_set(cond)) {
+ qd_log(log, QD_LOG_ERROR, "Listener error on %s: %s (%s)", host_port,
+ pn_condition_get_description(cond),
+ pn_condition_get_name(cond));
+ } else {
+ qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port);
+ }
+ pn_listener_set_context(li->pn_listener, 0);
+ li->pn_listener = 0;
+ qd_tcp_listener_decref(li);
+ }
+ break;
+
+ default:
+ break;
+ }
+}
+
+static qd_tcp_listener_t *qd_tcp_listener(qd_server_t *server)
+{
+ qd_tcp_listener_t *li = new_qd_tcp_listener_t();
+ if (!li) return 0;
+ ZERO(li);
+ sys_atomic_init(&li->ref_count, 1);
+ li->server = server;
+ li->context.context = li;
+ li->context.handler = &handle_listener_event;
+ return li;
+}
+
+static const int BACKLOG = 50; /* Listening backlog */
+
+static bool tcp_listener_listen(qd_tcp_listener_t *li) {
+ li->pn_listener = pn_listener();
+ if (li->pn_listener) {
+ pn_listener_set_context(li->pn_listener, &li->context);
+ pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, li->config.host_port, BACKLOG);
+ sys_atomic_inc(&li->ref_count); /* In use by proactor, PN_LISTENER_CLOSE will dec */
+ /* Listen is asynchronous, log "listening" message on PN_LISTENER_OPEN event */
+ } else {
+ qd_log(tcp_adaptor->log_source, QD_LOG_CRITICAL, "Failed to create listener for %s",
+ li->config.host_port);
+ }
+ return li->pn_listener;
+}
+
+qd_tcp_listener_t *qd_dispatch_configure_tcp_listener(qd_dispatch_t *qd, qd_entity_t *entity)
+{
+ qd_tcp_listener_t *li = qd_tcp_listener(qd->server);
+ if (!li || load_bridge_config(qd, &li->config, entity, true) != QD_ERROR_NONE) {
+ qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "Unable to create tcp listener: %s", qd_error_message());
+ qd_tcp_listener_decref(li);
+ return 0;
+ }
+ DEQ_ITEM_INIT(li);
+ DEQ_INSERT_TAIL(tcp_adaptor->listeners, li);
+ log_tcp_bridge_config(tcp_adaptor->log_source, &li->config, "TcpListener");
+ tcp_listener_listen(li);
+ return li;
+}
+
+void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl)
+{
+ qd_tcp_listener_t *li = (qd_tcp_listener_t*) impl;
+ if (li) {
+ if (li->pn_listener) {
+ pn_listener_close(li->pn_listener);
+ }
+ DEQ_REMOVE(tcp_adaptor->listeners, li);
+ qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Deleted TcpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port);
+ qd_tcp_listener_decref(li);
+ }
+}
+
+qd_error_t qd_entity_refresh_tcpListener(qd_entity_t* entity, void *impl)
+{
+ return QD_ERROR_NONE;
+}
+
+static qd_tcp_connector_t *qd_tcp_connector(qd_server_t *server)
+{
+ qd_tcp_connector_t *c = new_qd_tcp_connector_t();
+ if (!c) return 0;
+ ZERO(c);
+ sys_atomic_init(&c->ref_count, 1);
+ c->server = server;
+ return c;
+}
+
+void qd_tcp_connector_decref(qd_tcp_connector_t* c)
+{
+ if (c && sys_atomic_dec(&c->ref_count) == 1) {
+ free_bridge_config(&c->config);
+ free_qd_tcp_connector_t(c);
+ }
+}
+
+qd_tcp_connector_t *qd_dispatch_configure_tcp_connector(qd_dispatch_t *qd, qd_entity_t *entity)
+{
+ qd_tcp_connector_t *c = qd_tcp_connector(qd->server);
+ if (!c || load_bridge_config(qd, &c->config, entity, true) != QD_ERROR_NONE) {
+ qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "Unable to create tcp connector: %s", qd_error_message());
+ qd_tcp_connector_decref(c);
+ return 0;
+ }
+ DEQ_ITEM_INIT(c);
+ DEQ_INSERT_TAIL(tcp_adaptor->connectors, c);
+ log_tcp_bridge_config(tcp_adaptor->log_source, &c->config, "TcpConnector");
+ c->dispatcher = qdr_tcp_connection_egress(&(c->config), c->server, NULL);
+ return c;
+}
+
+static void close_egress_dispatcher(qdr_tcp_connection_t *context)
+{
+ //actual close needs to happen on connection thread
+ context->connector_closed = true;
+ qd_timer_schedule(context->activate_timer, 0);
+}
+
+void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl)
+{
+ qd_tcp_connector_t *ct = (qd_tcp_connector_t*) impl;
+ if (ct) {
+ //need to close the pseudo-connection used for dispatching
+ //deliveries out to live connnections:
+ qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Deleted TcpConnector for %s, %s:%s", ct->config.address, ct->config.host, ct->config.port);
+ close_egress_dispatcher((qdr_tcp_connection_t*) ct->dispatcher);
+ DEQ_REMOVE(tcp_adaptor->connectors, ct);
+ qd_tcp_connector_decref(ct);
+ }
+}
+
+qd_error_t qd_entity_refresh_tcpConnector(qd_entity_t* entity, void *impl)
+{
+ return QD_ERROR_NONE;
+}
+
+static void qdr_tcp_first_attach(void *context, qdr_connection_t *conn, qdr_link_t *link,
+ qdr_terminus_t *source, qdr_terminus_t *target,
+ qd_session_class_t session_class)
+{
+}
+
+static void qdr_tcp_connection_copy_reply_to(qdr_tcp_connection_t* tc, qd_iterator_t* reply_to)
+{
+ int length = qd_iterator_length(reply_to);
+ tc->reply_to = malloc(length + 1);
+ qd_iterator_strncpy(reply_to, tc->reply_to, length + 1);
+}
+
+static void qdr_tcp_second_attach(void *context, qdr_link_t *link,
+ qdr_terminus_t *source, qdr_terminus_t *target)
+{
+ void* link_context = qdr_link_get_context(link);
+ if (link_context) {
+ qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
+ if (qdr_link_direction(link) == QD_OUTGOING) {
+ if (tc->ingress) {
+ qdr_tcp_connection_copy_reply_to(tc, qdr_terminus_get_address(source));
+ // for ingress, can start reading from socket once we have
+ // a reply to address, as that is when we are able to send
+ // out a message
+ grant_read_buffers(tc);
+ handle_incoming(tc);
+ }
+ qdr_link_flow(tcp_adaptor->core, link, 10, false);
+ } else if (!tc->ingress) {
+ //for egress we can start reading from the socket once we
+ //have the link to send messages over
+ grant_read_buffers(tc);
+ }
+ }
+}
+
+
+static void qdr_tcp_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
+{
+}
+
+
+static void qdr_tcp_flow(void *context, qdr_link_t *link, int credit)
+{
+}
+
+
+static void qdr_tcp_offer(void *context, qdr_link_t *link, int delivery_count)
+{
+}
+
+
+static void qdr_tcp_drained(void *context, qdr_link_t *link)
+{
+}
+
+
+static void qdr_tcp_drain(void *context, qdr_link_t *link, bool mode)
+{
+}
+
+
+static int qdr_tcp_push(void *context, qdr_link_t *link, int limit)
+{
+ return qdr_link_process_deliveries(tcp_adaptor->core, link, limit);
+}
+
+
+static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled)
+{
+ void* link_context = qdr_link_get_context(link);
+ if (link_context) {
+ qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i][L%i] Delivery event", tc->conn_id, tc->outgoing_id);
+ if (tc->egress_dispatcher) {
+ qdr_tcp_connection_egress(&(tc->config), tc->server, delivery);
+ } else if (!tc->outstream) {
+ tc->outstream = delivery;
+ if (!tc->ingress) {
+ //on egress, can only set up link for the reverse
+ //direction once we receive the first part of the
+ //message from client to server
+ qd_message_t *msg = qdr_delivery_message(delivery);
+ qdr_tcp_connection_copy_reply_to(tc, qd_message_field_iterator(msg, QD_FIELD_REPLY_TO));
+ qdr_terminus_t *target = qdr_terminus(0);
+ qdr_terminus_set_address(target, tc->reply_to);
+ tc->incoming = qdr_link_first_attach(tc->conn,
+ QD_INCOMING,
+ qdr_terminus(0), //qdr_terminus_t *source,
+ target, //qdr_terminus_t *target,
+ "tcp.egress.in", //const char *name,
+ 0, //const char *terminus_addr,
+ false,
+ NULL,
+ &(tc->incoming_id));
+ qdr_link_set_context(tc->incoming, tc);
+ handle_incoming(tc);
+ }
+ }
+ handle_outgoing(tc);
+ }
+ return 0;
+}
+
+
+static int qdr_tcp_get_credit(void *context, qdr_link_t *link)
+{
+ return 10;
+}
+
+
+static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
+{
+ void* link_context = qdr_link_get_context(qdr_delivery_link(dlv));
+ if (link_context) {
+ qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%i] Delivery update", tc->conn_id);
+ }
+}
+
+
+static void qdr_tcp_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
+{
+}
+
+
+static void qdr_tcp_conn_trace(void *context, qdr_connection_t *conn, bool trace)
+{
+}
+
+static void qdr_tcp_activate(void *notused, qdr_connection_t *c)
+{
+ void *context = qdr_connection_get_context(c);
+ if (context) {
+ qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
+ if (conn->socket) {
+ pn_raw_connection_wake(conn->socket);
+ } else if (conn->activate_timer) {
+ // On egress, the raw connection is only created once the
+ // first part of the message encapsulating the
+ // client->server half of the stream has been
+ // received. Prior to that however a subscribing link (and
+ // its associated connection must be setup), for which we
+ // fake wakeup by using a timer.
+ qd_timer_schedule(conn->activate_timer, 0);
+ } else {
+ qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%i] Cannot activate", conn->conn_id);
+ }
+ }
+}
+
+/**
+ * This initialization function will be invoked when the router core is ready for the protocol
+ * adaptor to be created. This function must:
+ *
+ * 1) Register the protocol adaptor with the router-core.
+ * 2) Prepare the protocol adaptor to be configured.
+ */
+static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context)
+{
+ qdr_tcp_adaptor_t *adaptor = NEW(qdr_tcp_adaptor_t);
+ adaptor->core = core;
+ adaptor->adaptor = qdr_protocol_adaptor(core,
+ "tcp", // name
+ adaptor, // context
+ qdr_tcp_activate, // activate
+ qdr_tcp_first_attach,
+ qdr_tcp_second_attach,
+ qdr_tcp_detach,
+ qdr_tcp_flow,
+ qdr_tcp_offer,
+ qdr_tcp_drained,
+ qdr_tcp_drain,
+ qdr_tcp_push,
+ qdr_tcp_deliver,
+ qdr_tcp_get_credit,
+ qdr_tcp_delivery_update,
+ qdr_tcp_conn_close,
+ qdr_tcp_conn_trace);
+ adaptor->log_source = qd_log_source("TCP_ADAPTOR");
+ DEQ_INIT(adaptor->listeners);
+ DEQ_INIT(adaptor->connectors);
+ *adaptor_context = adaptor;
+
+ tcp_adaptor = adaptor;
+}
+
+
+static void qdr_tcp_adaptor_final(void *adaptor_context)
+{
+ qdr_tcp_adaptor_t *adaptor = (qdr_tcp_adaptor_t*) adaptor_context;
+ qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
+ free(adaptor);
+ tcp_adaptor = NULL;
+}
+
+/**
+ * Declare the adaptor so that it will self-register on process startup.
+ */
+QDR_CORE_ADAPTOR_DECLARE("tcp-adaptor", qdr_tcp_adaptor_init, qdr_tcp_adaptor_final)
diff --git a/src/adaptors/tcp_adaptor.h b/src/adaptors/tcp_adaptor.h
new file mode 100644
index 0000000..17bd7f9
--- /dev/null
+++ b/src/adaptors/tcp_adaptor.h
@@ -0,0 +1,79 @@
+#ifndef __tcp_adaptor_h__
+#define __tcp_adaptor_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/atomic.h>
+#include <qpid/dispatch/enum.h>
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/log.h>
+#include <proton/engine.h>
+#include <proton/event.h>
+#include <proton/ssl.h>
+
+#include "dispatch_private.h"
+#include "timer_private.h"
+
+typedef struct qd_tcp_listener_t qd_tcp_listener_t;
+typedef struct qd_tcp_connector_t qd_tcp_connector_t;
+typedef struct qd_bridge_config_t qd_bridge_config_t;
+
+struct qd_bridge_config_t
+{
+ char *name;
+ char *address;
+ char *host;
+ char *port;
+ char *site_id;
+ char *host_port;
+};
+
+struct qd_tcp_listener_t
+{
+ qd_handler_context_t context;
+ /* May be referenced by connection_manager and pn_listener_t */
+ sys_atomic_t ref_count;
+ qd_server_t *server;
+ qd_bridge_config_t config;
+ pn_listener_t *pn_listener;
+
+ DEQ_LINKS(qd_tcp_listener_t);
+};
+
+DEQ_DECLARE(qd_tcp_listener_t, qd_tcp_listener_list_t);
+ALLOC_DECLARE(qd_tcp_listener_t);
+
+struct qd_tcp_connector_t
+{
+ /* May be referenced by connection_manager, timer and pn_connection_t */
+ sys_atomic_t ref_count;
+ qd_server_t *server;
+ qd_bridge_config_t config;
+ void *dispatcher;
+
+ DEQ_LINKS(qd_tcp_connector_t);
+};
+
+DEQ_DECLARE(qd_tcp_connector_t, qd_tcp_connector_list_t);
+ALLOC_DECLARE(qd_tcp_connector_t);
+
+#endif
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org