You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:57:58 UTC

[qpid-dispatch] 12/32: Dataplane: Updated the reference adaptor to implement connection activation

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 9545f352aa13032c3433d704abe08a45d7bf5919
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Jun 3 15:18:26 2020 -0400

    Dataplane: Updated the reference adaptor to implement connection activation
---
 src/adaptors/reference_adaptor.c | 150 +++++++++++++++++++++------------------
 1 file changed, 79 insertions(+), 71 deletions(-)

diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index 7bbc45d..b4d4afa 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -28,8 +28,8 @@
 typedef struct qdr_ref_adaptor_t {
     qdr_core_t             *core;
     qdr_protocol_adaptor_t *adaptor;
-    qd_timer_t             *timer;
-    int                     sequence;
+    qd_timer_t             *startup_timer;
+    qd_timer_t             *activate_timer;
     qdr_connection_t       *conn;
     qdr_link_t             *out_link;
     qdr_link_t             *in_link;
@@ -39,11 +39,15 @@ typedef struct qdr_ref_adaptor_t {
 void qdr_ref_connection_activate_CT(void *context, qdr_connection_t *conn)
 {
     //
-    // Don't do this here, use a zero-length timer to defer to an IO thread.
+    // Use a zero-delay timer to defer this call to an IO thread
+    //
+    // Note that this may not be generally safe to do.  There's no guarantee that multiple
+    // activations won't schedule multiple IO threads running this code concurrently.
+    // Normally, we would rely on assurances provided by the IO scheduler (Proton) that no
+    // connection shall ever be served by more than one thread concurrently.
     //
     qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
-
-    while (qdr_connection_process(adaptor->conn)) {}
+    qd_timer_schedule(adaptor->activate_timer, 0);
 }
 
 
@@ -162,70 +166,72 @@ static void qdr_ref_conn_trace(void *context, qdr_connection_t *conn, bool trace
 }
 
 
-static void on_timer(void *context)
+static void on_startup(void *context)
 {
     qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
 
-    if (adaptor->sequence == 0) {
-        qdr_connection_info_t *info = qdr_connection_info(false, //bool             is_encrypted,
-                                                          false, //bool             is_authenticated,
-                                                          true,  //bool             opened,
-                                                          "",   //char            *sasl_mechanisms,
-                                                          QD_INCOMING, //qd_direction_t   dir,
-                                                          "127.0.0.1:47756",    //const char      *host,
-                                                          "",    //const char      *ssl_proto,
-                                                          "",    //const char      *ssl_cipher,
-                                                          "",    //const char      *user,
-                                                          "",    //const char      *container,
-                                                          pn_data(0),     //pn_data_t       *connection_properties,
-                                                          0,     //int              ssl_ssf,
-                                                          false, //bool             ssl,
-                                                          // set if remote is a qdrouter
-                                                          0);    //const qdr_router_version_t *version)
-
-        adaptor->conn = qdr_connection_opened(adaptor->core,
-                                              adaptor->adaptor,
-                                              true,
-                                              QDR_ROLE_NORMAL,
-                                              1,
-                                              10000,  // get this from qd_connection_t
-                                              0,
-                                              0,
-                                              false,
-                                              false,
-                                              false,
-                                              false,
-                                              250,
-                                              0,
-                                              info,
-                                              0,
-                                              0);
-
-        uint64_t link_id;
-        qdr_terminus_t *dynamic_source = qdr_terminus(0);
-        qdr_terminus_set_dynamic(dynamic_source);
-        qdr_terminus_t *target = qdr_terminus(0);
-        qdr_terminus_set_address(target, "echo-service");
-
-        adaptor->out_link = qdr_link_first_attach(adaptor->conn,
-                                                  QD_INCOMING,
-                                                  qdr_terminus(0),  //qdr_terminus_t   *source,
-                                                  target,           //qdr_terminus_t   *target,
-                                                  "ref.1",          //const char       *name,
-                                                  0,                //const char       *terminus_addr,
-                                                  &link_id);
-        adaptor->in_link = qdr_link_first_attach(adaptor->conn,
-                                                 QD_OUTGOING,
-                                                 dynamic_source,   //qdr_terminus_t   *source,
-                                                 qdr_terminus(0),  //qdr_terminus_t   *target,
-                                                 "ref.2",          //const char       *name,
-                                                 0,                //const char       *terminus_addr,
-                                                 &link_id);
-        adaptor->sequence++;
-    }
+    qdr_connection_info_t *info = qdr_connection_info(false, //bool             is_encrypted,
+                                                      false, //bool             is_authenticated,
+                                                      true,  //bool             opened,
+                                                      "",   //char            *sasl_mechanisms,
+                                                      QD_INCOMING, //qd_direction_t   dir,
+                                                      "127.0.0.1:47756",    //const char      *host,
+                                                      "",    //const char      *ssl_proto,
+                                                      "",    //const char      *ssl_cipher,
+                                                      "",    //const char      *user,
+                                                      "",    //const char      *container,
+                                                      pn_data(0),     //pn_data_t       *connection_properties,
+                                                      0,     //int              ssl_ssf,
+                                                      false, //bool             ssl,
+                                                      // set if remote is a qdrouter
+                                                      0);    //const qdr_router_version_t *version)
+
+    adaptor->conn = qdr_connection_opened(adaptor->core,
+                                          adaptor->adaptor,
+                                          true,
+                                          QDR_ROLE_NORMAL,
+                                          1,
+                                          10000,  // get this from qd_connection_t
+                                          0,
+                                          0,
+                                          false,
+                                          false,
+                                          false,
+                                          false,
+                                          250,
+                                          0,
+                                          info,
+                                          0,
+                                          0);
+
+    uint64_t link_id;
+    qdr_terminus_t *dynamic_source = qdr_terminus(0);
+    qdr_terminus_set_dynamic(dynamic_source);
+    qdr_terminus_t *target = qdr_terminus(0);
+    qdr_terminus_set_address(target, "echo-service");
+
+    adaptor->out_link = qdr_link_first_attach(adaptor->conn,
+                                              QD_INCOMING,
+                                              qdr_terminus(0),  //qdr_terminus_t   *source,
+                                              target,           //qdr_terminus_t   *target,
+                                              "ref.1",          //const char       *name,
+                                              0,                //const char       *terminus_addr,
+                                              &link_id);
+    adaptor->in_link = qdr_link_first_attach(adaptor->conn,
+                                             QD_OUTGOING,
+                                             dynamic_source,   //qdr_terminus_t   *source,
+                                             qdr_terminus(0),  //qdr_terminus_t   *target,
+                                             "ref.2",          //const char       *name,
+                                             0,                //const char       *terminus_addr,
+                                             &link_id);
+}
+
 
-    qd_timer_schedule(adaptor->timer, 1000);
-    qdr_connection_process(adaptor->conn);
+static void on_activate(void *context)
+{
+    qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
+
+    while (qdr_connection_process(adaptor->conn)) {}
 }
 
 
@@ -236,7 +242,7 @@ static void on_timer(void *context)
  *   1) Register the protocol adaptor with the router-core.
  *   2) Prepare the protocol adaptor to be configured.
  */
-static void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context)
+void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context)
 {
     qdr_ref_adaptor_t *adaptor = NEW(qdr_ref_adaptor_t);
     adaptor->core    = core;
@@ -260,17 +266,19 @@ static void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context)
     *adaptor_context = adaptor;
 
     // TEMPORARY //
-    adaptor->timer    = qd_timer(core->qd, on_timer, adaptor);
-    adaptor->sequence = 0;
-    qd_timer_schedule(adaptor->timer, 0);
+    adaptor->startup_timer = qd_timer(core->qd, on_startup, adaptor);
+    qd_timer_schedule(adaptor->startup_timer, 0);
+
+    adaptor->activate_timer = qd_timer(core->qd, on_activate, adaptor);
 }
 
 
-static void qdr_ref_adaptor_final(void *adaptor_context)
+void qdr_ref_adaptor_final(void *adaptor_context)
 {
     qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) adaptor_context;
     qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
-    qd_timer_free(adaptor->timer);
+    qd_timer_free(adaptor->startup_timer);
+    qd_timer_free(adaptor->activate_timer);
     free(adaptor);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org