You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jd...@apache.org on 2019/11/14 14:08:13 UTC
[qpid-proton] branch master updated: PROTON-2030 Use
CLOCK_MONOTONIC in proactors for pn_transport_tick (#180)
This is an automated email from the ASF dual-hosted git repository.
jdanek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/master by this push:
new aa02108 PROTON-2030 Use CLOCK_MONOTONIC in proactors for pn_transport_tick (#180)
aa02108 is described below
commit aa021087a0de81c99041b325779480041d60e0e8
Author: Jiří Daněk <jd...@redhat.com>
AuthorDate: Thu Nov 14 15:08:03 2019 +0100
PROTON-2030 Use CLOCK_MONOTONIC in proactors for pn_transport_tick (#180)
- deprecates pn_proactor_now in favour of new pn_proactor_now_64
- replaces use of pn_i_now2 with pn_proactor_now_64
- uses GetTickCount64() in iocp proactor, instead of GetSystemTimeAsFileTime
---
c/include/proton/proactor.h | 16 ++++++++++++++++
c/include/proton/transport.h | 2 +-
c/src/core/engine-internal.h | 2 +-
c/src/core/transport.c | 6 +++---
c/src/proactor/epoll.c | 21 +++++++++------------
c/src/proactor/libuv.c | 4 ++++
c/src/proactor/win_iocp.c | 37 +++++++++++--------------------------
7 files changed, 45 insertions(+), 43 deletions(-)
diff --git a/c/include/proton/proactor.h b/c/include/proton/proactor.h
index d4fbdd8..88544bc 100644
--- a/c/include/proton/proactor.h
+++ b/c/include/proton/proactor.h
@@ -299,6 +299,8 @@ PNP_EXTERN pn_proactor_t *pn_connection_proactor(pn_connection_t *connection);
PNP_EXTERN pn_proactor_t *pn_event_proactor(pn_event_t *event);
/**
+ * @deprecated Use ::pn_proactor_now_64()
+ *
* Get the real elapsed time since an arbitrary point in the past in milliseconds.
*
* This may be used as a portable way to get a process-local timestamp for the
@@ -313,6 +315,20 @@ PNP_EXTERN pn_proactor_t *pn_event_proactor(pn_event_t *event);
PNP_EXTERN pn_millis_t pn_proactor_now(void);
/**
+ * Get the real elapsed time since an arbitrary point in the past in milliseconds.
+ *
+ * This may be used as a portable way to get a process-local timestamp for the
+ * current time. It is monotonically increasing and will never go backwards.
+ *
+ * Note: this is not a suitable value for an AMQP timestamp to be sent as part
+ * of a message. Such a timestamp should use the real time in milliseconds
+ * since the epoch.
+ *
+ * @note Thread-safe
+ */
+PNP_EXTERN int64_t pn_proactor_now_64(void);
+
+/**
* @}
*/
diff --git a/c/include/proton/transport.h b/c/include/proton/transport.h
index dbde8b1..d7693e0 100644
--- a/c/include/proton/transport.h
+++ b/c/include/proton/transport.h
@@ -695,7 +695,7 @@ PN_EXTERN bool pn_transport_closed(pn_transport_t *transport);
*
* @param[in] transport the transport to process.
* @param[in] now A monotonically-increasing time value in milliseconds.
- * Does not need to be wall-clock time or a valid AMQP timestamp, but must increase montonically.
+ * Does not need to be wall-clock time or a valid AMQP timestamp, but must increase monotonically.
*
* @return If non-zero, then the monotonic expiration time of the next pending
* timer event for the transport. The caller must invoke pn_transport_tick
diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
index 1e56562..b59d1fc 100644
--- a/c/src/core/engine-internal.h
+++ b/c/src/core/engine-internal.h
@@ -104,7 +104,7 @@ typedef struct pn_io_layer_t {
ssize_t (*process_input)(struct pn_transport_t *transport, unsigned int layer, const char *, size_t);
ssize_t (*process_output)(struct pn_transport_t *transport, unsigned int layer, char *, size_t);
void (*handle_error)(struct pn_transport_t* transport, unsigned int layer);
- pn_timestamp_t (*process_tick)(struct pn_transport_t *transport, unsigned int layer, pn_timestamp_t);
+ int64_t (*process_tick)(struct pn_transport_t *transport, unsigned int layer, int64_t);
size_t (*buffered_output)(struct pn_transport_t *transport); // how much output is held
} pn_io_layer_t;
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index f8a2d97..4dea853 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -146,7 +146,7 @@ static ssize_t pn_input_read_amqp(pn_transport_t *transport, unsigned int layer,
static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available);
static ssize_t pn_output_write_amqp(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available);
static void pn_error_amqp(pn_transport_t *transport, unsigned int layer);
-static pn_timestamp_t pn_tick_amqp(pn_transport_t *transport, unsigned int layer, pn_timestamp_t now);
+static int64_t pn_tick_amqp(pn_transport_t *transport, unsigned int layer, int64_t now);
static ssize_t pn_io_layer_input_autodetect(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available);
static ssize_t pn_io_layer_output_null(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available);
@@ -2628,7 +2628,7 @@ static ssize_t pn_input_read_amqp(pn_transport_t* transport, unsigned int layer,
}
/* process AMQP related timer events */
-static pn_timestamp_t pn_tick_amqp(pn_transport_t* transport, unsigned int layer, pn_timestamp_t now)
+static int64_t pn_tick_amqp(pn_transport_t* transport, unsigned int layer, int64_t now)
{
pn_timestamp_t timeout = 0;
@@ -2923,7 +2923,7 @@ pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport)
int64_t pn_transport_tick(pn_transport_t *transport, int64_t now)
{
- pn_timestamp_t r = 0;
+ int64_t r = 0;
for (int i = 0; i<PN_IO_LAYER_CT; ++i) {
if (transport->io_layers[i] && transport->io_layers[i]->process_tick)
r = pn_timestamp_min(r, transport->io_layers[i]->process_tick(transport, i, now));
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 677d6b6..a360ff0 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -199,7 +199,7 @@ static void ptimer_set_lh(ptimer_t *pt, uint64_t t_millis) {
// EPOLLIN is possible but not assured
pt->in_doubt = true;
}
- pt->timer_active = t_millis;
+ pt->timer_active = t_millis != 0;
}
static void ptimer_set(ptimer_t *pt, uint64_t t_millis) {
@@ -262,13 +262,6 @@ static void ptimer_finalize(ptimer_t *pt) {
pmutex_finalize(&pt->mutex);
}
-pn_timestamp_t pn_i_now2(void)
-{
- struct timespec now;
- clock_gettime(CLOCK_REALTIME, &now);
- return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000);
-}
-
// ========================================================================
// Proactor common code
// ========================================================================
@@ -562,7 +555,7 @@ typedef struct pconnection_t {
epoll_extended_t *rearm_target; /* main or secondary epollfd */
} pconnection_t;
-/* Protects read/update of pn_connnection_t pointer to it's pconnection_t
+/* Protects read/update of pn_connection_t pointer to it's pconnection_t
*
* Global because pn_connection_wake()/pn_connection_proactor() navigate from
* the pn_connection_t before we know the proactor or driver. Critical sections
@@ -588,7 +581,7 @@ static void set_pconnection(pn_connection_t* c, pconnection_t *pc) {
}
/*
- * A listener can have mutiple sockets (as specified in the addrinfo). They
+ * A listener can have multiple sockets (as specified in the addrinfo). They
* are armed separately. The individual psockets can be part of at most one
* list: the global proactor overflow retry list or the per-listener list of
* pending accepts (valid inbound socket obtained, but pn_listener_accept not
@@ -1426,7 +1419,7 @@ static void pconnection_tick(pconnection_t *pc) {
pn_transport_t *t = pc->driver.transport;
if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
ptimer_set(&pc->timer, 0);
- uint64_t now = pn_i_now2();
+ uint64_t now = pn_proactor_now_64();
uint64_t next = pn_transport_tick(t, now);
if (next) {
ptimer_set(&pc->timer, next - now);
@@ -2341,7 +2334,11 @@ const pn_netaddr_t *pn_listener_addr(pn_listener_t *l) {
}
pn_millis_t pn_proactor_now(void) {
+ return (pn_millis_t) pn_proactor_now_64();
+}
+
+int64_t pn_proactor_now_64(void) {
struct timespec t;
clock_gettime(CLOCK_MONOTONIC, &t);
- return t.tv_sec*1000 + t.tv_nsec/1000000;
+ return t.tv_sec * 1000 + t.tv_nsec / 1000000;
}
diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c
index 6a5ecaf..2fabd43 100644
--- a/c/src/proactor/libuv.c
+++ b/c/src/proactor/libuv.c
@@ -1339,5 +1339,9 @@ const pn_netaddr_t *pn_listener_addr(pn_listener_t *l) {
}
pn_millis_t pn_proactor_now(void) {
+ return (pn_millis_t) pn_proactor_now_64();
+}
+
+int64_t pn_proactor_now_64(void) {
return uv_hrtime() / 1000000; // uv_hrtime returns time in nanoseconds
}
diff --git a/c/src/proactor/win_iocp.c b/c/src/proactor/win_iocp.c
index 732af7b..0f0a54b 100644
--- a/c/src/proactor/win_iocp.c
+++ b/c/src/proactor/win_iocp.c
@@ -220,8 +220,6 @@ void pni_zombie_check(iocp_t *, pn_timestamp_t);
pn_timestamp_t pni_zombie_deadline(iocp_t *);
int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code);
-
-pn_timestamp_t pn_i_now2(void);
}
// ======================================================================
@@ -532,17 +530,6 @@ size_t pni_write_pipeline_size(write_pipeline_t *pl)
namespace pn_experimental {
-pn_timestamp_t pn_i_now2(void)
-{
- FILETIME now;
- GetSystemTimeAsFileTime(&now);
- ULARGE_INTEGER t;
- t.u.HighPart = now.dwHighDateTime;
- t.u.LowPart = now.dwLowDateTime;
- // Convert to milliseconds and adjust base epoch
- return t.QuadPart / 10000 - 11644473600000;
-}
-
static void iocp_log(const char *fmt, ...)
{
va_list ap;
@@ -1356,7 +1343,7 @@ static void zombie_list_add(iocpdesc_t *iocpd)
return;
}
// Allow 2 seconds for graceful shutdown before releasing socket resource.
- iocpd->reap_time = pn_i_now2() + 2000;
+ iocpd->reap_time = pn_proactor_now_64() + 2000;
pn_list_add(iocpd->iocp->zombie_list, iocpd);
}
@@ -1429,7 +1416,7 @@ static void drain_zombie_completions(iocp_t *iocp)
if (grace > 0 && grace < 60000)
shutdown_grace = (unsigned) grace;
}
- pn_timestamp_t now = pn_i_now2();
+ pn_timestamp_t now = pn_proactor_now_64();
pn_timestamp_t deadline = now + shutdown_grace;
while (pn_list_size(iocp->zombie_list)) {
@@ -1440,7 +1427,7 @@ static void drain_zombie_completions(iocp_t *iocp)
iocp_log("unexpected IOCP failure on Proton IO shutdown %d\n", GetLastError());
break;
}
- now = pn_i_now2();
+ now = pn_proactor_now_64();
}
if (now >= deadline && pn_list_size(iocp->zombie_list) && iocp->iocp_trace)
// Should only happen if really slow TCP handshakes, i.e. total network failure
@@ -1969,9 +1956,9 @@ class reaper {
// Call with lock
if (timer_ || !running)
return;
- pn_timestamp_t now = pn_i_now2();
+ int64_t now = pn_proactor_now_64();
pni_zombie_check(iocp_, now);
- pn_timestamp_t zd = pni_zombie_deadline(iocp_);
+ int64_t zd = pni_zombie_deadline(iocp_);
if (zd) {
DWORD tm = (zd > now) ? zd - now : 1;
if (!CreateTimerQueueTimer(&timer_, timer_queue_, reap_check_cb, this, tm,
@@ -2146,7 +2133,7 @@ static void pconnection_tick(pconnection_t *pc) {
if(!stop_timer(pc->context.proactor->timer_queue, &pc->tick_timer)) {
// TODO: handle error
}
- uint64_t now = pn_i_now2();
+ uint64_t now = pn_proactor_now_64();
uint64_t next = pn_transport_tick(t, now);
if (next) {
if (!start_timer(pc->context.proactor->timer_queue, &pc->tick_timer, tick_timer_cb, pc, next - now)) {
@@ -3432,11 +3419,9 @@ const pn_netaddr_t *pn_listener_addr(pn_listener_t *l) {
}
pn_millis_t pn_proactor_now(void) {
- FILETIME now;
- GetSystemTimeAsFileTime(&now);
- ULARGE_INTEGER t;
- t.u.HighPart = now.dwHighDateTime;
- t.u.LowPart = now.dwLowDateTime;
- // Convert to milliseconds and adjust base epoch
- return t.QuadPart / 10000 - 11644473600000;
+ return (pn_millis_t) pn_proactor_now_64();
+}
+
+int64_t pn_proactor_now_64(void) {
+ return GetTickCount64();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org