You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:57:58 UTC
[qpid-dispatch] 12/32: Dataplane: Updated the reference adaptor to
implement connection activation
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 9545f352aa13032c3433d704abe08a45d7bf5919
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Jun 3 15:18:26 2020 -0400
Dataplane: Updated the reference adaptor to implement connection activation
---
src/adaptors/reference_adaptor.c | 150 +++++++++++++++++++++------------------
1 file changed, 79 insertions(+), 71 deletions(-)
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index 7bbc45d..b4d4afa 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -28,8 +28,8 @@
typedef struct qdr_ref_adaptor_t {
qdr_core_t *core;
qdr_protocol_adaptor_t *adaptor;
- qd_timer_t *timer;
- int sequence;
+ qd_timer_t *startup_timer;
+ qd_timer_t *activate_timer;
qdr_connection_t *conn;
qdr_link_t *out_link;
qdr_link_t *in_link;
@@ -39,11 +39,15 @@ typedef struct qdr_ref_adaptor_t {
void qdr_ref_connection_activate_CT(void *context, qdr_connection_t *conn)
{
//
- // Don't do this here, use a zero-length timer to defer to an IO thread.
+ // Use a zero-delay timer to defer this call to an IO thread
+ //
+ // Note that this may not be generally safe to do. There's no guarantee that multiple
+ // activations won't schedule multiple IO threads running this code concurrently.
+ // Normally, we would rely on assurances provided by the IO scheduler (Proton) that no
+ // connection shall ever be served by more than one thread concurrently.
//
qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
-
- while (qdr_connection_process(adaptor->conn)) {}
+ qd_timer_schedule(adaptor->activate_timer, 0);
}
@@ -162,70 +166,72 @@ static void qdr_ref_conn_trace(void *context, qdr_connection_t *conn, bool trace
}
-static void on_timer(void *context)
+static void on_startup(void *context)
{
qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
- if (adaptor->sequence == 0) {
- qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted,
- false, //bool is_authenticated,
- true, //bool opened,
- "", //char *sasl_mechanisms,
- QD_INCOMING, //qd_direction_t dir,
- "127.0.0.1:47756", //const char *host,
- "", //const char *ssl_proto,
- "", //const char *ssl_cipher,
- "", //const char *user,
- "", //const char *container,
- pn_data(0), //pn_data_t *connection_properties,
- 0, //int ssl_ssf,
- false, //bool ssl,
- // set if remote is a qdrouter
- 0); //const qdr_router_version_t *version)
-
- adaptor->conn = qdr_connection_opened(adaptor->core,
- adaptor->adaptor,
- true,
- QDR_ROLE_NORMAL,
- 1,
- 10000, // get this from qd_connection_t
- 0,
- 0,
- false,
- false,
- false,
- false,
- 250,
- 0,
- info,
- 0,
- 0);
-
- uint64_t link_id;
- qdr_terminus_t *dynamic_source = qdr_terminus(0);
- qdr_terminus_set_dynamic(dynamic_source);
- qdr_terminus_t *target = qdr_terminus(0);
- qdr_terminus_set_address(target, "echo-service");
-
- adaptor->out_link = qdr_link_first_attach(adaptor->conn,
- QD_INCOMING,
- qdr_terminus(0), //qdr_terminus_t *source,
- target, //qdr_terminus_t *target,
- "ref.1", //const char *name,
- 0, //const char *terminus_addr,
- &link_id);
- adaptor->in_link = qdr_link_first_attach(adaptor->conn,
- QD_OUTGOING,
- dynamic_source, //qdr_terminus_t *source,
- qdr_terminus(0), //qdr_terminus_t *target,
- "ref.2", //const char *name,
- 0, //const char *terminus_addr,
- &link_id);
- adaptor->sequence++;
- }
+ qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted,
+ false, //bool is_authenticated,
+ true, //bool opened,
+ "", //char *sasl_mechanisms,
+ QD_INCOMING, //qd_direction_t dir,
+ "127.0.0.1:47756", //const char *host,
+ "", //const char *ssl_proto,
+ "", //const char *ssl_cipher,
+ "", //const char *user,
+ "", //const char *container,
+ pn_data(0), //pn_data_t *connection_properties,
+ 0, //int ssl_ssf,
+ false, //bool ssl,
+ // set if remote is a qdrouter
+ 0); //const qdr_router_version_t *version)
+
+ adaptor->conn = qdr_connection_opened(adaptor->core,
+ adaptor->adaptor,
+ true,
+ QDR_ROLE_NORMAL,
+ 1,
+ 10000, // get this from qd_connection_t
+ 0,
+ 0,
+ false,
+ false,
+ false,
+ false,
+ 250,
+ 0,
+ info,
+ 0,
+ 0);
+
+ uint64_t link_id;
+ qdr_terminus_t *dynamic_source = qdr_terminus(0);
+ qdr_terminus_set_dynamic(dynamic_source);
+ qdr_terminus_t *target = qdr_terminus(0);
+ qdr_terminus_set_address(target, "echo-service");
+
+ adaptor->out_link = qdr_link_first_attach(adaptor->conn,
+ QD_INCOMING,
+ qdr_terminus(0), //qdr_terminus_t *source,
+ target, //qdr_terminus_t *target,
+ "ref.1", //const char *name,
+ 0, //const char *terminus_addr,
+ &link_id);
+ adaptor->in_link = qdr_link_first_attach(adaptor->conn,
+ QD_OUTGOING,
+ dynamic_source, //qdr_terminus_t *source,
+ qdr_terminus(0), //qdr_terminus_t *target,
+ "ref.2", //const char *name,
+ 0, //const char *terminus_addr,
+ &link_id);
+}
+
- qd_timer_schedule(adaptor->timer, 1000);
- qdr_connection_process(adaptor->conn);
+static void on_activate(void *context)
+{
+ qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
+
+ while (qdr_connection_process(adaptor->conn)) {}
}
@@ -236,7 +242,7 @@ static void on_timer(void *context)
* 1) Register the protocol adaptor with the router-core.
* 2) Prepare the protocol adaptor to be configured.
*/
-static void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context)
+void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context)
{
qdr_ref_adaptor_t *adaptor = NEW(qdr_ref_adaptor_t);
adaptor->core = core;
@@ -260,17 +266,19 @@ static void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context)
*adaptor_context = adaptor;
// TEMPORARY //
- adaptor->timer = qd_timer(core->qd, on_timer, adaptor);
- adaptor->sequence = 0;
- qd_timer_schedule(adaptor->timer, 0);
+ adaptor->startup_timer = qd_timer(core->qd, on_startup, adaptor);
+ qd_timer_schedule(adaptor->startup_timer, 0);
+
+ adaptor->activate_timer = qd_timer(core->qd, on_activate, adaptor);
}
-static void qdr_ref_adaptor_final(void *adaptor_context)
+void qdr_ref_adaptor_final(void *adaptor_context)
{
qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) adaptor_context;
qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
- qd_timer_free(adaptor->timer);
+ qd_timer_free(adaptor->startup_timer);
+ qd_timer_free(adaptor->activate_timer);
free(adaptor);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org