You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jd...@apache.org on 2019/11/14 14:08:13 UTC

[qpid-proton] branch master updated: PROTON-2030 Use CLOCK_MONOTONIC in proactors for pn_transport_tick (#180)

This is an automated email from the ASF dual-hosted git repository.

jdanek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
     new aa02108  PROTON-2030 Use CLOCK_MONOTONIC in proactors for pn_transport_tick (#180)
aa02108 is described below

commit aa021087a0de81c99041b325779480041d60e0e8
Author: Jiří Daněk <jd...@redhat.com>
AuthorDate: Thu Nov 14 15:08:03 2019 +0100

    PROTON-2030 Use CLOCK_MONOTONIC in proactors for pn_transport_tick (#180)
    
    - deprecates pn_proactor_now in favour of new pn_proactor_now_64
    - replaces use of pn_i_now2 with pn_proactor_now_64
    - uses GetTickCount64() in iocp proactor, instead of GetSystemTimeAsFileTime
---
 c/include/proton/proactor.h  | 16 ++++++++++++++++
 c/include/proton/transport.h |  2 +-
 c/src/core/engine-internal.h |  2 +-
 c/src/core/transport.c       |  6 +++---
 c/src/proactor/epoll.c       | 21 +++++++++------------
 c/src/proactor/libuv.c       |  4 ++++
 c/src/proactor/win_iocp.c    | 37 +++++++++++--------------------------
 7 files changed, 45 insertions(+), 43 deletions(-)

diff --git a/c/include/proton/proactor.h b/c/include/proton/proactor.h
index d4fbdd8..88544bc 100644
--- a/c/include/proton/proactor.h
+++ b/c/include/proton/proactor.h
@@ -299,6 +299,8 @@ PNP_EXTERN pn_proactor_t *pn_connection_proactor(pn_connection_t *connection);
 PNP_EXTERN pn_proactor_t *pn_event_proactor(pn_event_t *event);
 
 /**
+ * @deprecated Use ::pn_proactor_now_64()
+ *
  * Get the real elapsed time since an arbitrary point in the past in milliseconds.
  *
  * This may be used as a portable way to get a process-local timestamp for the
@@ -313,6 +315,20 @@ PNP_EXTERN pn_proactor_t *pn_event_proactor(pn_event_t *event);
 PNP_EXTERN pn_millis_t pn_proactor_now(void);
 
 /**
+ * Get the real elapsed time since an arbitrary point in the past in milliseconds.
+ *
+ * This may be used as a portable way to get a process-local timestamp for the
+ * current time. It is monotonically increasing and will never go backwards.
+ *
+ * Note: this is not a suitable value for an AMQP timestamp to be sent as part
+ * of a message.  Such a timestamp should use the real time in milliseconds
+ * since the epoch.
+ *
+ * @note Thread-safe
+ */
+PNP_EXTERN int64_t pn_proactor_now_64(void);
+
+/**
  * @}
  */
 
diff --git a/c/include/proton/transport.h b/c/include/proton/transport.h
index dbde8b1..d7693e0 100644
--- a/c/include/proton/transport.h
+++ b/c/include/proton/transport.h
@@ -695,7 +695,7 @@ PN_EXTERN bool pn_transport_closed(pn_transport_t *transport);
  *
  * @param[in] transport the transport to process.
  * @param[in] now A monotonically-increasing time value in milliseconds.
- *   Does not need to be wall-clock time or a valid AMQP timestamp, but must increase montonically.
+ *   Does not need to be wall-clock time or a valid AMQP timestamp, but must increase monotonically.
  *
  * @return If non-zero, then the monotonic expiration time of the next pending
  * timer event for the transport.  The caller must invoke pn_transport_tick
diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
index 1e56562..b59d1fc 100644
--- a/c/src/core/engine-internal.h
+++ b/c/src/core/engine-internal.h
@@ -104,7 +104,7 @@ typedef struct pn_io_layer_t {
   ssize_t (*process_input)(struct pn_transport_t *transport, unsigned int layer, const char *, size_t);
   ssize_t (*process_output)(struct pn_transport_t *transport, unsigned int layer, char *, size_t);
   void (*handle_error)(struct pn_transport_t* transport, unsigned int layer);
-  pn_timestamp_t (*process_tick)(struct pn_transport_t *transport, unsigned int layer, pn_timestamp_t);
+  int64_t (*process_tick)(struct pn_transport_t *transport, unsigned int layer, int64_t);
   size_t (*buffered_output)(struct pn_transport_t *transport);  // how much output is held
 } pn_io_layer_t;
 
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index f8a2d97..4dea853 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -146,7 +146,7 @@ static ssize_t pn_input_read_amqp(pn_transport_t *transport, unsigned int layer,
 static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available);
 static ssize_t pn_output_write_amqp(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available);
 static void pn_error_amqp(pn_transport_t *transport, unsigned int layer);
-static pn_timestamp_t pn_tick_amqp(pn_transport_t *transport, unsigned int layer, pn_timestamp_t now);
+static int64_t pn_tick_amqp(pn_transport_t *transport, unsigned int layer, int64_t now);
 
 static ssize_t pn_io_layer_input_autodetect(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available);
 static ssize_t pn_io_layer_output_null(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available);
@@ -2628,7 +2628,7 @@ static ssize_t pn_input_read_amqp(pn_transport_t* transport, unsigned int layer,
 }
 
 /* process AMQP related timer events */
-static pn_timestamp_t pn_tick_amqp(pn_transport_t* transport, unsigned int layer, pn_timestamp_t now)
+static int64_t pn_tick_amqp(pn_transport_t* transport, unsigned int layer, int64_t now)
 {
   pn_timestamp_t timeout = 0;
 
@@ -2923,7 +2923,7 @@ pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport)
 
 int64_t pn_transport_tick(pn_transport_t *transport, int64_t now)
 {
-  pn_timestamp_t r = 0;
+  int64_t r = 0;
   for (int i = 0; i<PN_IO_LAYER_CT; ++i) {
     if (transport->io_layers[i] && transport->io_layers[i]->process_tick)
       r = pn_timestamp_min(r, transport->io_layers[i]->process_tick(transport, i, now));
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 677d6b6..a360ff0 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -199,7 +199,7 @@ static void ptimer_set_lh(ptimer_t *pt, uint64_t t_millis) {
     // EPOLLIN is possible but not assured
     pt->in_doubt = true;
   }
-  pt->timer_active = t_millis;
+  pt->timer_active = t_millis != 0;
 }
 
 static void ptimer_set(ptimer_t *pt, uint64_t t_millis) {
@@ -262,13 +262,6 @@ static void ptimer_finalize(ptimer_t *pt) {
   pmutex_finalize(&pt->mutex);
 }
 
-pn_timestamp_t pn_i_now2(void)
-{
-  struct timespec now;
-  clock_gettime(CLOCK_REALTIME, &now);
-  return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000);
-}
-
 // ========================================================================
 // Proactor common code
 // ========================================================================
@@ -562,7 +555,7 @@ typedef struct pconnection_t {
   epoll_extended_t *rearm_target;    /* main or secondary epollfd */
 } pconnection_t;
 
-/* Protects read/update of pn_connnection_t pointer to it's pconnection_t
+/* Protects read/update of pn_connection_t pointer to it's pconnection_t
  *
  * Global because pn_connection_wake()/pn_connection_proactor() navigate from
  * the pn_connection_t before we know the proactor or driver. Critical sections
@@ -588,7 +581,7 @@ static void set_pconnection(pn_connection_t* c, pconnection_t *pc) {
 }
 
 /*
- * A listener can have mutiple sockets (as specified in the addrinfo).  They
+ * A listener can have multiple sockets (as specified in the addrinfo).  They
  * are armed separately.  The individual psockets can be part of at most one
  * list: the global proactor overflow retry list or the per-listener list of
  * pending accepts (valid inbound socket obtained, but pn_listener_accept not
@@ -1426,7 +1419,7 @@ static void pconnection_tick(pconnection_t *pc) {
   pn_transport_t *t = pc->driver.transport;
   if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
     ptimer_set(&pc->timer, 0);
-    uint64_t now = pn_i_now2();
+    uint64_t now = pn_proactor_now_64();
     uint64_t next = pn_transport_tick(t, now);
     if (next) {
       ptimer_set(&pc->timer, next - now);
@@ -2341,7 +2334,11 @@ const pn_netaddr_t *pn_listener_addr(pn_listener_t *l) {
 }
 
 pn_millis_t pn_proactor_now(void) {
+  return (pn_millis_t) pn_proactor_now_64();
+}
+
+int64_t pn_proactor_now_64(void) {
   struct timespec t;
   clock_gettime(CLOCK_MONOTONIC, &t);
-  return t.tv_sec*1000 + t.tv_nsec/1000000;
+  return t.tv_sec * 1000 + t.tv_nsec / 1000000;
 }
diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c
index 6a5ecaf..2fabd43 100644
--- a/c/src/proactor/libuv.c
+++ b/c/src/proactor/libuv.c
@@ -1339,5 +1339,9 @@ const pn_netaddr_t *pn_listener_addr(pn_listener_t *l) {
 }
 
 pn_millis_t pn_proactor_now(void) {
+  return (pn_millis_t) pn_proactor_now_64();
+}
+
+int64_t pn_proactor_now_64(void) {
   return uv_hrtime() / 1000000; // uv_hrtime returns time in nanoseconds
 }
diff --git a/c/src/proactor/win_iocp.c b/c/src/proactor/win_iocp.c
index 732af7b..0f0a54b 100644
--- a/c/src/proactor/win_iocp.c
+++ b/c/src/proactor/win_iocp.c
@@ -220,8 +220,6 @@ void pni_zombie_check(iocp_t *, pn_timestamp_t);
 pn_timestamp_t pni_zombie_deadline(iocp_t *);
 
 int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code);
-
-pn_timestamp_t pn_i_now2(void);
 }
 
 // ======================================================================
@@ -532,17 +530,6 @@ size_t pni_write_pipeline_size(write_pipeline_t *pl)
 
 namespace pn_experimental {
 
-pn_timestamp_t pn_i_now2(void)
-{
-  FILETIME now;
-  GetSystemTimeAsFileTime(&now);
-  ULARGE_INTEGER t;
-  t.u.HighPart = now.dwHighDateTime;
-  t.u.LowPart = now.dwLowDateTime;
-  // Convert to milliseconds and adjust base epoch
-  return t.QuadPart / 10000 - 11644473600000;
-}
-
 static void iocp_log(const char *fmt, ...)
 {
   va_list ap;
@@ -1356,7 +1343,7 @@ static void zombie_list_add(iocpdesc_t *iocpd)
     return;
   }
   // Allow 2 seconds for graceful shutdown before releasing socket resource.
-  iocpd->reap_time = pn_i_now2() + 2000;
+  iocpd->reap_time = pn_proactor_now_64() + 2000;
   pn_list_add(iocpd->iocp->zombie_list, iocpd);
 }
 
@@ -1429,7 +1416,7 @@ static void drain_zombie_completions(iocp_t *iocp)
     if (grace > 0 && grace < 60000)
       shutdown_grace = (unsigned) grace;
   }
-  pn_timestamp_t now = pn_i_now2();
+  pn_timestamp_t now = pn_proactor_now_64();
   pn_timestamp_t deadline = now + shutdown_grace;
 
   while (pn_list_size(iocp->zombie_list)) {
@@ -1440,7 +1427,7 @@ static void drain_zombie_completions(iocp_t *iocp)
       iocp_log("unexpected IOCP failure on Proton IO shutdown %d\n", GetLastError());
       break;
     }
-    now = pn_i_now2();
+    now = pn_proactor_now_64();
   }
   if (now >= deadline && pn_list_size(iocp->zombie_list) && iocp->iocp_trace)
     // Should only happen if really slow TCP handshakes, i.e. total network failure
@@ -1969,9 +1956,9 @@ class reaper {
         // Call with lock
         if (timer_ || !running)
             return;
-        pn_timestamp_t now = pn_i_now2();
+        int64_t now = pn_proactor_now_64();
         pni_zombie_check(iocp_, now);
-        pn_timestamp_t zd = pni_zombie_deadline(iocp_);
+        int64_t zd = pni_zombie_deadline(iocp_);
         if (zd) {
             DWORD tm = (zd > now) ? zd - now : 1;
             if (!CreateTimerQueueTimer(&timer_, timer_queue_, reap_check_cb, this, tm,
@@ -2146,7 +2133,7 @@ static void pconnection_tick(pconnection_t *pc) {
     if(!stop_timer(pc->context.proactor->timer_queue, &pc->tick_timer)) {
       // TODO: handle error
     }
-    uint64_t now = pn_i_now2();
+    uint64_t now = pn_proactor_now_64();
     uint64_t next = pn_transport_tick(t, now);
     if (next) {
       if (!start_timer(pc->context.proactor->timer_queue, &pc->tick_timer, tick_timer_cb, pc, next - now)) {
@@ -3432,11 +3419,9 @@ const pn_netaddr_t *pn_listener_addr(pn_listener_t *l) {
 }
 
 pn_millis_t pn_proactor_now(void) {
-  FILETIME now;
-  GetSystemTimeAsFileTime(&now);
-  ULARGE_INTEGER t;
-  t.u.HighPart = now.dwHighDateTime;
-  t.u.LowPart = now.dwLowDateTime;
-  // Convert to milliseconds and adjust base epoch
-  return t.QuadPart / 10000 - 11644473600000;
+    return (pn_millis_t) pn_proactor_now_64();
+}
+
+int64_t pn_proactor_now_64(void) {
+  return GetTickCount64();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org