You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2019/03/01 19:55:10 UTC

[qpid-dispatch] branch schedule-zero created (now 9c2371e)

This is an automated email from the ASF dual-hosted git repository.

aconway pushed a change to branch schedule-zero
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git.


      at 9c2371e  DISPATCH-1274: Separate qd_timer_t lock from server

This branch includes the following new commits:

     new de5c509  DISPATCH-1274: Optimize qd_timer_schedule(0)
     new 9c2371e  DISPATCH-1274: Separate qd_timer_t lock from server

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 01/02: DISPATCH-1274: Optimize qd_timer_schedule(0)

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aconway pushed a commit to branch schedule-zero
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit de5c509d073f279a5b66668b38016e8555b13e7a
Author: Alan Conway <ac...@redhat.com>
AuthorDate: Fri Mar 1 10:49:54 2019 -0500

    DISPATCH-1274: Optimize qd_timer_schedule(0)
    
    Introduced pn_immediate_t, a simpler schedule for immediate requests.
    qd_timer_schedule delegates schedule(0) requests.
---
 src/CMakeLists.txt                           |  1 +
 src/immediate.c                              | 96 ++++++++++++++++++++++++++++
 src/{timer_private.h => immediate_private.h} | 39 +++++------
 src/server.c                                 | 20 ++++--
 src/server_private.h                         |  1 +
 src/timer.c                                  | 15 ++++-
 src/timer_private.h                          |  2 +
 7 files changed, 150 insertions(+), 24 deletions(-)

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 1544db9..cb8ad70 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -66,6 +66,7 @@ set(qpid_dispatch_SOURCES
   entity_cache.c
   failoverlist.c
   hash.c
+  immediate.c
   iterator.c
   log.c
   message.c
diff --git a/src/immediate.c b/src/immediate.c
new file mode 100644
index 0000000..6149be7
--- /dev/null
+++ b/src/immediate.c
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "immediate_private.h"
+#include "server_private.h"
+
+#include <qpid/dispatch/threading.h>
+#include <assert.h>
+
+struct qd_immediate_t {
+    qd_server_t *server;
+    void (*handler)(void* context);
+    void *context;
+    bool armed;
+};
+
+/* Array rather than list for fast access and cache-coherence */
+static qd_immediate_t immediates[256] = {0};
+static size_t count = 0;
+static sys_mutex_t *lock = NULL;
+
+void qd_immediate_initialize(void) {
+    lock = sys_mutex();
+}
+
+void qd_immediate_finalize(void) {
+    sys_mutex_free(lock);
+    lock = 0;
+}
+
+qd_immediate_t *qd_immediate(qd_dispatch_t *qd, void (*handler)(void*), void* context) {
+    sys_mutex_lock(lock);
+    if (count >= sizeof(immediates)/sizeof(immediates[0])) {
+        assert("exceeded max number of qd_immediate_t objects" == 0);
+        return 0;
+    }
+    qd_immediate_t *i = &immediates[count++];
+    i->server = qd ? qd->server : NULL;
+    i->handler = handler;
+    i->context = context;
+    i->armed = false;
+    sys_mutex_unlock(lock);
+    return i;
+}
+
+void qd_immediate_arm(qd_immediate_t *i) {
+    bool interrupt = false;
+    sys_mutex_lock(lock);
+    if (!i->armed) {
+        interrupt = i->armed = true;
+    }
+    sys_mutex_unlock(lock);
+    if (interrupt && i->server) {
+        qd_server_interrupt(i->server);
+    }
+}
+
+void qd_immediate_disarm(qd_immediate_t *i) {
+    sys_mutex_lock(lock);
+    i->armed = false;
+    sys_mutex_unlock(lock);
+}
+
+void qd_immediate_free(qd_immediate_t *i) {
+    /* Just disarm, its harmless to leave it in place. */
+    qd_immediate_disarm(i);
+}
+
+void qd_immediate_visit() {
+    sys_mutex_lock(lock);
+    for (qd_immediate_t *i = immediates; i < immediates + count; ++i) {
+        if (i->armed) {
+            i->armed = false;
+            sys_mutex_unlock(lock);
+            i->handler(i->context);
+            sys_mutex_lock(lock);
+        }
+    }
+    sys_mutex_unlock(lock);
+}
diff --git a/src/timer_private.h b/src/immediate_private.h
similarity index 52%
copy from src/timer_private.h
copy to src/immediate_private.h
index 537eb4b..cd8d11b 100644
--- a/src/timer_private.h
+++ b/src/immediate_private.h
@@ -1,5 +1,5 @@
-#ifndef __timer_private_h__
-#define __timer_private_h__ 1
+#ifndef __immediate_private_h__
+#define __immediate_private_h__ 1
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,26 +19,27 @@
  * under the License.
  */
 
-#include <qpid/dispatch/ctools.h>
-#include <qpid/dispatch/timer.h>
-#include <qpid/dispatch/threading.h>
 
-struct qd_timer_t {
-    DEQ_LINKS(qd_timer_t);
-    qd_server_t      *server;
-    qd_timer_cb_t     handler;
-    void             *context;
-    qd_timestamp_t    delta_time;
-    bool              scheduled; /* true means on scheduled list, false on idle list */
-};
+#include <qpid/dispatch/dispatch.h>
+#include <qpid/dispatch/server.h>
+#include <stdint.h>
 
-DEQ_DECLARE(qd_timer_t, qd_timer_list_t);
+/* Immediate actions - used by timer to optimize schedule(0) */
 
-void qd_timer_initialize(sys_mutex_t *server_lock);
-void qd_timer_finalize(void);
-void qd_timer_visit();
+void qd_immediate_initialize(void);
+void qd_immediate_finalize(void);
+void qd_immediate_visit(void);
 
-/// For tests only
-sys_mutex_t* qd_timer_lock();
+typedef struct qd_immediate_t qd_immediate_t;
+
+qd_immediate_t *qd_immediate(qd_dispatch_t *qd, void (*handler)(void*), void* context);
+
+/* Arm causes a call to handler(context) ASAP in a server thread. */
+void qd_immediate_arm(qd_immediate_t *);
+
+/* After disarm() returns, there will be no handler() call unless re-armed. */
+void qd_immediate_disarm(qd_immediate_t *);
+
+void qd_immediate_free(qd_immediate_t *);
 
 #endif
diff --git a/src/server.c b/src/server.c
index 24add4e..1863546 100644
--- a/src/server.c
+++ b/src/server.c
@@ -38,6 +38,7 @@
 #include "entity.h"
 #include "entity_cache.h"
 #include "dispatch_private.h"
+#include "immediate_private.h"
 #include "policy.h"
 #include "server_private.h"
 #include "timer_private.h"
@@ -68,6 +69,7 @@ struct qd_server_t {
     uint64_t                  next_connection_id;
     void                     *py_displayname_obj;
     qd_http_server_t         *http;
+    bool                      stopping;
 };
 
 #define HEARTBEAT_INTERVAL 1000
@@ -905,10 +907,15 @@ static bool handle(qd_server_t *qd_server, pn_event_t *e, pn_connection_t *pn_co
     switch (pn_event_type(e)) {
 
     case PN_PROACTOR_INTERRUPT:
-        /* Interrupt the next thread */
-        pn_proactor_interrupt(qd_server->proactor);
-        /* Stop the current thread */
-        return false;
+        if (qd_server->stopping) {
+            /* Interrupt the next thread */
+            pn_proactor_interrupt(qd_server->proactor);
+            /* Stop the current thread */
+            return false;
+        } else {
+            /* Check for immediate tasks */
+            qd_immediate_visit();
+        }
 
     case PN_PROACTOR_TIMEOUT:
         qd_timer_visit();
@@ -1296,6 +1303,7 @@ void qd_server_run(qd_dispatch_t *qd)
 
 void qd_server_stop(qd_dispatch_t *qd)
 {
+    qd->server->stopping = true;
     /* Interrupt the proactor, async-signal-safe */
     pn_proactor_interrupt(qd->server->proactor);
 }
@@ -1505,6 +1513,10 @@ void qd_server_timeout(qd_server_t *server, qd_duration_t duration) {
     pn_proactor_set_timeout(server->proactor, duration);
 }
 
+void qd_server_interrupt(qd_server_t *server) {
+    pn_proactor_interrupt(server->proactor);
+}
+
 qd_dispatch_t* qd_server_dispatch(qd_server_t *server) { return server->qd; }
 
 const char* qd_connection_name(const qd_connection_t *c) {
diff --git a/src/server_private.h b/src/server_private.h
index 9f3c75c..9fcec6c 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -38,6 +38,7 @@
 
 qd_dispatch_t* qd_server_dispatch(qd_server_t *server);
 void qd_server_timeout(qd_server_t *server, qd_duration_t delay);
+void qd_server_interrupt(qd_server_t *server);
 
 qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t* config);
 
diff --git a/src/timer.c b/src/timer.c
index a4aae2a..0fd87c7 100644
--- a/src/timer.c
+++ b/src/timer.c
@@ -18,8 +18,9 @@
  */
 
 #include "dispatch_private.h"
-#include "timer_private.h"
+#include "immediate_private.h"
 #include "server_private.h"
+#include "timer_private.h"
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/threading.h>
 #include <qpid/dispatch/alloc.h>
@@ -54,6 +55,7 @@ static void timer_cancel_LH(qd_timer_t *timer)
         DEQ_INSERT_TAIL(idle_timers, timer);
         timer->scheduled = false;
     }
+    qd_immediate_disarm(timer->immediate);
 }
 
 /* Adjust timer's time_base and delays for the current time. */
@@ -95,6 +97,7 @@ qd_timer_t *qd_timer(qd_dispatch_t *qd, qd_timer_cb_t cb, void* context)
     timer->context    = context;
     timer->delta_time = 0;
     timer->scheduled  = false;
+    timer->immediate  = qd_immediate(qd, cb, context);
     sys_mutex_lock(lock);
     DEQ_INSERT_TAIL(idle_timers, timer);
     sys_mutex_unlock(lock);
@@ -109,6 +112,7 @@ void qd_timer_free(qd_timer_t *timer)
     sys_mutex_lock(lock);
     timer_cancel_LH(timer);
     DEQ_REMOVE(idle_timers, timer);
+    qd_immediate_free(timer->immediate);
     sys_mutex_unlock(lock);
     free_qd_timer_t(timer);
 }
@@ -124,6 +128,11 @@ qd_timestamp_t qd_timer_now() {
 void qd_timer_schedule(qd_timer_t *timer, qd_duration_t duration)
 {
     sys_mutex_lock(lock);
+    if (duration == 0) {
+        qd_immediate_arm(timer->immediate);
+        sys_mutex_unlock(lock);
+        return;
+    }
     timer_cancel_LH(timer);  // Timer is now on the idle list
     DEQ_REMOVE(idle_timers, timer);
 
@@ -175,6 +184,7 @@ void qd_timer_cancel(qd_timer_t *timer)
 
 void qd_timer_initialize(sys_mutex_t *server_lock)
 {
+    qd_immediate_initialize();
     lock = server_lock;
     DEQ_INIT(idle_timers);
     DEQ_INIT(scheduled_timers);
@@ -185,6 +195,7 @@ void qd_timer_initialize(sys_mutex_t *server_lock)
 void qd_timer_finalize(void)
 {
     lock = 0;
+    qd_immediate_finalize();
 }
 
 
@@ -196,6 +207,7 @@ void qd_timer_visit()
     qd_timer_t *timer = DEQ_HEAD(scheduled_timers);
     while (timer && timer->delta_time == 0) {
         timer_cancel_LH(timer); /* Removes timer from scheduled_timers */
+        qd_immediate_disarm(timer->immediate);
         sys_mutex_unlock(lock);
         timer->handler(timer->context); /* Call the handler outside the lock, may re-schedule */
         sys_mutex_lock(lock);
@@ -206,4 +218,5 @@ void qd_timer_visit()
         qd_server_timeout(first->server, first->delta_time);
     }
     sys_mutex_unlock(lock);
+    qd_immediate_visit();
 }
diff --git a/src/timer_private.h b/src/timer_private.h
index 537eb4b..263fca5 100644
--- a/src/timer_private.h
+++ b/src/timer_private.h
@@ -19,6 +19,7 @@
  * under the License.
  */
 
+#include "immediate_private.h"
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/timer.h>
 #include <qpid/dispatch/threading.h>
@@ -29,6 +30,7 @@ struct qd_timer_t {
     qd_timer_cb_t     handler;
     void             *context;
     qd_timestamp_t    delta_time;
+    qd_immediate_t   *immediate; /* Optimized path for schedule(0) */
     bool              scheduled; /* true means on scheduled list, false on idle list */
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 02/02: DISPATCH-1274: Separate qd_timer_t lock from server

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aconway pushed a commit to branch schedule-zero
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 9c2371e9628aee40356fbea36e3b53493b0a0853
Author: Alan Conway <ac...@redhat.com>
AuthorDate: Fri Mar 1 14:42:38 2019 -0500

    DISPATCH-1274: Separate qd_timer_t lock from server
    
    qd_timer_t was sharing a lock with qd_server_t for historical reasons.
    Code inspection shows there is no need for the sharing.
    
    - timer.c calls qd_server_timeout() which is thread-safe, and no other server functions.
    - timer calls handlers outside the lock so it doesn't mater what lock is used.
---
 src/server.c        | 2 +-
 src/timer.c         | 5 +++--
 src/timer_private.h | 2 +-
 3 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/src/server.c b/src/server.c
index 1863546..760126d 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1227,7 +1227,7 @@ qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *containe
     qd_server->cond             = sys_cond();
     DEQ_INIT(qd_server->conn_list);
 
-    qd_timer_initialize(qd_server->lock);
+    qd_timer_initialize();
 
     qd_server->pause_requests         = 0;
     qd_server->threads_paused         = 0;
diff --git a/src/timer.c b/src/timer.c
index 0fd87c7..c76f77d 100644
--- a/src/timer.c
+++ b/src/timer.c
@@ -182,10 +182,10 @@ void qd_timer_cancel(qd_timer_t *timer)
 //=========================================================================
 
 
-void qd_timer_initialize(sys_mutex_t *server_lock)
+void qd_timer_initialize()
 {
     qd_immediate_initialize();
-    lock = server_lock;
+    lock = sys_mutex();
     DEQ_INIT(idle_timers);
     DEQ_INIT(scheduled_timers);
     time_base = 0;
@@ -194,6 +194,7 @@ void qd_timer_initialize(sys_mutex_t *server_lock)
 
 void qd_timer_finalize(void)
 {
+    sys_mutex_free(lock);
     lock = 0;
     qd_immediate_finalize();
 }
diff --git a/src/timer_private.h b/src/timer_private.h
index 263fca5..9f6f1cb 100644
--- a/src/timer_private.h
+++ b/src/timer_private.h
@@ -36,7 +36,7 @@ struct qd_timer_t {
 
 DEQ_DECLARE(qd_timer_t, qd_timer_list_t);
 
-void qd_timer_initialize(sys_mutex_t *server_lock);
+void qd_timer_initialize(void);
 void qd_timer_finalize(void);
 void qd_timer_visit();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org