You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:57:56 UTC
[qpid-dispatch] 10/32: Dataplane: Renamed tcp_adaptor to
reference_adaptor. Added more test content to the reference adaptor. It now
sends messages to a fixed address. Fixed qdr_terminus_format to show the
dynamically-assigned address for dynamis termini.
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 3b58b218cce9b66cc2f291710724ea377d7e9bbb
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Jun 3 14:43:22 2020 -0400
Dataplane: Renamed tcp_adaptor to reference_adaptor. Added more test content to the reference adaptor. It now sends messages to a fixed address. Fixed qdr_terminus_format to show the dynamically-assigned address for dynamis termini.
---
src/CMakeLists.txt | 2 +-
src/adaptors/reference_adaptor.c | 280 +++++++++++++++++++++++++++++++++++++++
src/adaptors/tcp_adaptor.c | 145 --------------------
src/router_core/terminus.c | 12 +-
4 files changed, 289 insertions(+), 150 deletions(-)
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 8c0c7f7..37a61d1 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -38,7 +38,7 @@ add_custom_command (
# Build the qpid-dispatch library.
set(qpid_dispatch_SOURCES
- adaptors/tcp_adaptor.c
+ adaptors/reference_adaptor.c
alloc_pool.c
amqp.c
bitmask.c
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
new file mode 100644
index 0000000..7bbc45d
--- /dev/null
+++ b/src/adaptors/reference_adaptor.c
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/dispatch/ctools.h"
+#include "qpid/dispatch/protocol_adaptor.h"
+#include "delivery.h"
+#include "qpid/dispatch/timer.h"
+#include "qpid/dispatch/message.h"
+#include <stdio.h>
+#include <inttypes.h>
+
+typedef struct qdr_ref_adaptor_t {
+ qdr_core_t *core;
+ qdr_protocol_adaptor_t *adaptor;
+ qd_timer_t *timer;
+ int sequence;
+ qdr_connection_t *conn;
+ qdr_link_t *out_link;
+ qdr_link_t *in_link;
+} qdr_ref_adaptor_t;
+
+
+void qdr_ref_connection_activate_CT(void *context, qdr_connection_t *conn)
+{
+ //
+ // Don't do this here, use a zero-length timer to defer to an IO thread.
+ //
+ qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
+
+ while (qdr_connection_process(adaptor->conn)) {}
+}
+
+
+static void qdr_ref_first_attach(void *context, qdr_connection_t *conn, qdr_link_t *link,
+ qdr_terminus_t *source, qdr_terminus_t *target,
+ qd_session_class_t session_class)
+{
+}
+
+
+static void qdr_ref_second_attach(void *context, qdr_link_t *link,
+ qdr_terminus_t *source, qdr_terminus_t *target)
+{
+ char ftarget[100];
+ char fsource[100];
+
+ ftarget[0] = '\0';
+ fsource[0] = '\0';
+
+ if (!!source) {
+ size_t size = 100;
+ qdr_terminus_format(source, fsource, &size);
+ }
+
+ if (!!target) {
+ size_t size = 100;
+ qdr_terminus_format(target, ftarget, &size);
+ }
+
+ printf("qdr_ref_second_attach: source=%s target=%s\n", fsource, ftarget);
+}
+
+
+static void qdr_ref_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
+{
+}
+
+
+static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
+{
+ qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
+ qd_buffer_list_t buffers;
+ qd_buffer_t *buf;
+
+ printf("qdr_ref_flow: %d credits issued\n", credit);
+
+ qd_message_t *msg = qd_message();
+ DEQ_INIT(buffers);
+ buf = qd_buffer();
+ char *insert = (char*) qd_buffer_cursor(buf);
+ strcpy(insert, "Test Payload");
+ qd_buffer_insert(buf, 13);
+ DEQ_INSERT_HEAD(buffers, buf);
+ qd_message_compose_1(msg, "echo-service", &buffers);
+
+ qdr_link_deliver(adaptor->out_link, msg, 0, false, 0, 0);
+}
+
+
+static void qdr_ref_offer(void *context, qdr_link_t *link, int delivery_count)
+{
+}
+
+
+static void qdr_ref_drained(void *context, qdr_link_t *link)
+{
+}
+
+
+static void qdr_ref_drain(void *context, qdr_link_t *link, bool mode)
+{
+}
+
+
+static int qdr_ref_push(void *context, qdr_link_t *link, int limit)
+{
+ return 0;
+}
+
+
+static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled)
+{
+ return 0;
+}
+
+
+static int qdr_ref_get_credit(void *context, qdr_link_t *link)
+{
+ return 0;
+}
+
+
+static void qdr_ref_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
+{
+ char *dispname;
+
+ switch (disp) {
+ case PN_ACCEPTED: dispname = "ACCEPTED"; break;
+ case PN_REJECTED: dispname = "REJECTED"; break;
+ case PN_RELEASED: dispname = "RELEASED"; break;
+ case PN_MODIFIED: dispname = "MODIFIED"; break;
+ default:
+ dispname = "<UNKNOWN>";
+ }
+ printf("qdr_ref_delivery_update: disp=%s settled=%s\n", dispname, settled ? "true" : "false");
+}
+
+
+static void qdr_ref_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
+{
+}
+
+
+static void qdr_ref_conn_trace(void *context, qdr_connection_t *conn, bool trace)
+{
+}
+
+
+static void on_timer(void *context)
+{
+ qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
+
+ if (adaptor->sequence == 0) {
+ qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted,
+ false, //bool is_authenticated,
+ true, //bool opened,
+ "", //char *sasl_mechanisms,
+ QD_INCOMING, //qd_direction_t dir,
+ "127.0.0.1:47756", //const char *host,
+ "", //const char *ssl_proto,
+ "", //const char *ssl_cipher,
+ "", //const char *user,
+ "", //const char *container,
+ pn_data(0), //pn_data_t *connection_properties,
+ 0, //int ssl_ssf,
+ false, //bool ssl,
+ // set if remote is a qdrouter
+ 0); //const qdr_router_version_t *version)
+
+ adaptor->conn = qdr_connection_opened(adaptor->core,
+ adaptor->adaptor,
+ true,
+ QDR_ROLE_NORMAL,
+ 1,
+ 10000, // get this from qd_connection_t
+ 0,
+ 0,
+ false,
+ false,
+ false,
+ false,
+ 250,
+ 0,
+ info,
+ 0,
+ 0);
+
+ uint64_t link_id;
+ qdr_terminus_t *dynamic_source = qdr_terminus(0);
+ qdr_terminus_set_dynamic(dynamic_source);
+ qdr_terminus_t *target = qdr_terminus(0);
+ qdr_terminus_set_address(target, "echo-service");
+
+ adaptor->out_link = qdr_link_first_attach(adaptor->conn,
+ QD_INCOMING,
+ qdr_terminus(0), //qdr_terminus_t *source,
+ target, //qdr_terminus_t *target,
+ "ref.1", //const char *name,
+ 0, //const char *terminus_addr,
+ &link_id);
+ adaptor->in_link = qdr_link_first_attach(adaptor->conn,
+ QD_OUTGOING,
+ dynamic_source, //qdr_terminus_t *source,
+ qdr_terminus(0), //qdr_terminus_t *target,
+ "ref.2", //const char *name,
+ 0, //const char *terminus_addr,
+ &link_id);
+ adaptor->sequence++;
+ }
+
+ qd_timer_schedule(adaptor->timer, 1000);
+ qdr_connection_process(adaptor->conn);
+}
+
+
+/**
+ * This initialization function will be invoked when the router core is ready for the protocol
+ * adaptor to be created. This function must:
+ *
+ * 1) Register the protocol adaptor with the router-core.
+ * 2) Prepare the protocol adaptor to be configured.
+ */
+static void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context)
+{
+ qdr_ref_adaptor_t *adaptor = NEW(qdr_ref_adaptor_t);
+ adaptor->core = core;
+ adaptor->adaptor = qdr_protocol_adaptor(core,
+ "reference", // name
+ adaptor, // context
+ qdr_ref_connection_activate_CT,
+ qdr_ref_first_attach,
+ qdr_ref_second_attach,
+ qdr_ref_detach,
+ qdr_ref_flow,
+ qdr_ref_offer,
+ qdr_ref_drained,
+ qdr_ref_drain,
+ qdr_ref_push,
+ qdr_ref_deliver,
+ qdr_ref_get_credit,
+ qdr_ref_delivery_update,
+ qdr_ref_conn_close,
+ qdr_ref_conn_trace);
+ *adaptor_context = adaptor;
+
+ // TEMPORARY //
+ adaptor->timer = qd_timer(core->qd, on_timer, adaptor);
+ adaptor->sequence = 0;
+ qd_timer_schedule(adaptor->timer, 0);
+}
+
+
+static void qdr_ref_adaptor_final(void *adaptor_context)
+{
+ qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) adaptor_context;
+ qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
+ qd_timer_free(adaptor->timer);
+ free(adaptor);
+}
+
+/**
+ * Declare the adaptor so that it will self-register on process startup.
+ */
+QDR_CORE_ADAPTOR_DECLARE("ref-adaptor", qdr_ref_adaptor_init, qdr_ref_adaptor_final)
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
deleted file mode 100644
index ab5dc48..0000000
--- a/src/adaptors/tcp_adaptor.c
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qpid/dispatch/ctools.h"
-#include "qpid/dispatch/protocol_adaptor.h"
-#include "delivery.h"
-#include <stdio.h>
-#include <inttypes.h>
-
-typedef struct qdr_tcp_adaptor_t {
- qdr_core_t *core;
- qdr_protocol_adaptor_t *adaptor;
-} qdr_tcp_adaptor_t;
-
-
-static void qdr_tcp_first_attach(void *context, qdr_connection_t *conn, qdr_link_t *link,
- qdr_terminus_t *source, qdr_terminus_t *target,
- qd_session_class_t session_class)
-{
-}
-
-
-static void qdr_tcp_second_attach(void *context, qdr_link_t *link,
- qdr_terminus_t *source, qdr_terminus_t *target)
-{
-}
-
-
-static void qdr_tcp_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
-{
-}
-
-
-static void qdr_tcp_flow(void *context, qdr_link_t *link, int credit)
-{
-}
-
-
-static void qdr_tcp_offer(void *context, qdr_link_t *link, int delivery_count)
-{
-}
-
-
-static void qdr_tcp_drained(void *context, qdr_link_t *link)
-{
-}
-
-
-static void qdr_tcp_drain(void *context, qdr_link_t *link, bool mode)
-{
-}
-
-
-static int qdr_tcp_push(void *context, qdr_link_t *link, int limit)
-{
- return 0;
-}
-
-
-static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled)
-{
- return 0;
-}
-
-
-static int qdr_tcp_get_credit(void *context, qdr_link_t *link)
-{
- return 0;
-}
-
-
-static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
-{
-}
-
-
-static void qdr_tcp_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
-{
-}
-
-
-static void qdr_tcp_conn_trace(void *context, qdr_connection_t *conn, bool trace)
-{
-}
-
-
-/**
- * This initialization function will be invoked when the router core is ready for the protocol
- * adaptor to be created. This function must:
- *
- * 1) Register the protocol adaptor with the router-core.
- * 2) Prepare the protocol adaptor to be configured.
- */
-static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context)
-{
- qdr_tcp_adaptor_t *adaptor = NEW(qdr_tcp_adaptor_t);
- adaptor->core = core;
- adaptor->adaptor = qdr_protocol_adaptor(core,
- "tcp", // name
- adaptor, // context
- 0, // activate
- qdr_tcp_first_attach,
- qdr_tcp_second_attach,
- qdr_tcp_detach,
- qdr_tcp_flow,
- qdr_tcp_offer,
- qdr_tcp_drained,
- qdr_tcp_drain,
- qdr_tcp_push,
- qdr_tcp_deliver,
- qdr_tcp_get_credit,
- qdr_tcp_delivery_update,
- qdr_tcp_conn_close,
- qdr_tcp_conn_trace);
- *adaptor_context = adaptor;
-}
-
-
-static void qdr_tcp_adaptor_final(void *adaptor_context)
-{
- qdr_tcp_adaptor_t *adaptor = (qdr_tcp_adaptor_t*) adaptor_context;
- qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
- free(adaptor);
-}
-
-/**
- * Declare the adaptor so that it will self-register on process startup.
- */
-QDR_CORE_ADAPTOR_DECLARE("tcp-adaptor", qdr_tcp_adaptor_init, qdr_tcp_adaptor_final)
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
index 9674987..921baf8 100644
--- a/src/router_core/terminus.c
+++ b/src/router_core/terminus.c
@@ -92,13 +92,17 @@ void qdr_terminus_format(qdr_terminus_t *term, char *output, size_t *size)
break;
}
- if (term->dynamic)
- len = safe_snprintf(output, *size, "<dynamic>");
- else if (term->address && term->address->iterator) {
+ if (term->dynamic) {
+ len = safe_snprintf(output, *size, "(dyn)");
+ output += len;
+ *size -= len;
+ }
+
+ if (term->address && term->address->iterator) {
qd_iterator_reset_view(term->address->iterator, ITER_VIEW_ALL);
len = qd_iterator_ncopy(term->address->iterator, (unsigned char*) output, *size - 1);
output[len] = 0;
- } else if (term->address == 0)
+ } else
len = safe_snprintf(output, *size, "<none>");
output += len;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org