You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/11/14 21:11:49 UTC
svn commit: r1409360 - in /qpid/proton/trunk/proton-c:
include/proton/driver.h src/driver.c src/messenger.c src/proton.c
Author: kgiusti
Date: Wed Nov 14 20:11:48 2012
New Revision: 1409360
URL: http://svn.apache.org/viewvc?rev=1409360&view=rev
Log:
PROTON-111: keep timeout handling within the context of the driver
Modified:
qpid/proton/trunk/proton-c/include/proton/driver.h
qpid/proton/trunk/proton-c/src/driver.c
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/proton-c/src/proton.c
Modified: qpid/proton/trunk/proton-c/include/proton/driver.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1409360&r1=1409359&r2=1409360&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/driver.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/driver.h Wed Nov 14 20:11:48 2012
@@ -121,15 +121,6 @@ pn_listener_t *pn_driver_listener(pn_dri
*/
pn_connector_t *pn_driver_connector(pn_driver_t *driver);
-/** Get the current time as an AMQP timestamp
- *
- * Returns the current local time in AMQP 1.0 format: milliseconds since Unix Epoch.
- *
- * @param[in] driver the driver
- * @return timestamp
- */
-pn_timestamp_t pn_driver_timestamp(pn_driver_t *driver);
-
/** Free the driver allocated via pn_driver, and all associated
* listeners and connectors.
*
@@ -270,17 +261,12 @@ void pn_connector_trace(pn_connector_t *
/** Service the given connector.
*
- * Handle any inbound data, outbound data, or timing events pending on the connector. If
- * there are any pending timeouts that will need to be processed in the future, then
- * return the deadline for the next outstanding timeout.
+ * Handle any inbound data, outbound data, or timing events pending on
+ * the connector.
*
* @param[in] connector the connector to process.
- * @param[in] now the current time in msec offset from Unix Epoch (see AMQP 1.0)
- *
- * @return if non-zero: a deadline - pn_connector_process must be invoked again
- * for the given connector at least once before this deadline expires.
*/
-pn_timestamp_t pn_connector_process(pn_connector_t *connector, pn_timestamp_t now);
+void pn_connector_process(pn_connector_t *connector);
/** Access the listener which opened this connector.
*
Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1409360&r1=1409359&r2=1409360&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Wed Nov 14 20:11:48 2012
@@ -61,6 +61,7 @@ struct pn_driver_t {
size_t nfds;
int ctrl[2]; //pipe for updating selectable status
pn_trace_t trace;
+ pn_timestamp_t wakeup;
};
struct pn_listener_t {
@@ -89,7 +90,7 @@ struct pn_connector_t {
int status;
pn_trace_t trace;
bool closed;
- time_t wakeup;
+ pn_timestamp_t wakeup;
void (*read)(pn_connector_t *);
void (*write) (pn_connector_t *);
size_t input_size;
@@ -108,6 +109,8 @@ struct pn_connector_t {
/* Impls */
+static pn_timestamp_t pn_driver_now(pn_driver_t *driver);
+
// listener
static void pn_driver_add_listener(pn_driver_t *d, pn_listener_t *l)
@@ -575,12 +578,10 @@ static pn_timestamp_t pn_connector_tick(
return pn_transport_tick(ctor->transport, now);
}
-pn_timestamp_t pn_connector_process(pn_connector_t *c, pn_timestamp_t now)
+void pn_connector_process(pn_connector_t *c)
{
- pn_timestamp_t next_timeout = 0;
-
if (c) {
- if (c->closed) return 0;
+ if (c->closed) return;
if (c->pending_read) {
c->read(c);
@@ -588,8 +589,7 @@ pn_timestamp_t pn_connector_process(pn_c
}
pn_connector_process_input(c);
- next_timeout = pn_connector_tick(c, now);
- c->pending_tick = (next_timeout != 0);
+ c->wakeup = pn_connector_tick(c, pn_driver_now(c->driver));
pn_connector_process_output(c);
if (c->pending_write) {
@@ -602,14 +602,20 @@ pn_timestamp_t pn_connector_process(pn_c
fprintf(stderr, "Closed %s\n", c->name);
}
pn_connector_close(c);
- return 0;
}
}
- return next_timeout;
}
// driver
+static pn_timestamp_t pn_driver_now(pn_driver_t *driver)
+{
+ struct timeval now;
+ if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
+
+ return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_usec / 1000);
+}
+
pn_driver_t *pn_driver()
{
pn_driver_t *d = (pn_driver_t *) malloc(sizeof(pn_driver_t));
@@ -632,6 +638,7 @@ pn_driver_t *pn_driver()
d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
(pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
(pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF));
+ d->wakeup = 0;
// XXX
if (pipe(d->ctrl)) {
@@ -693,6 +700,7 @@ static void pn_driver_rebuild(pn_driver_
d->fds = (struct pollfd *) realloc(d->fds, d->capacity*sizeof(struct pollfd));
}
+ d->wakeup = 0;
d->nfds = 0;
d->fds[d->nfds].fd = d->ctrl[0];
@@ -714,6 +722,7 @@ static void pn_driver_rebuild(pn_driver_
for (int i = 0; i < d->connector_count; i++)
{
if (!c->closed) {
+ d->wakeup = pn_timestamp_next_expire(d->wakeup, c->wakeup);
d->fds[d->nfds].fd = c->fd;
d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) | (c->status & PN_SEL_WR ? POLLOUT : 0);
d->fds[d->nfds].revents = 0;
@@ -731,9 +740,13 @@ void pn_driver_wait_1(pn_driver_t *d)
int pn_driver_wait_2(pn_driver_t *d, int timeout)
{
- if (poll(d->fds, d->nfds, d->closed_count > 0 ? 0 : timeout) == -1)
- return pn_error_from_errno(d->error, "poll");
- return 0;
+ if (d->wakeup) {
+ pn_timestamp_t now = pn_driver_now(d);
+ timeout = (now >= d->wakeup) ? 0 : pn_min(timeout, d->wakeup - now);
+ }
+ if (poll(d->fds, d->nfds, d->closed_count > 0 ? 0 : timeout) == -1)
+ return pn_error_from_errno(d->error, "poll");
+ return 0;
}
void pn_driver_wait_3(pn_driver_t *d)
@@ -750,6 +763,7 @@ void pn_driver_wait_3(pn_driver_t *d)
l = l->listener_next;
}
+ pn_timestamp_t now = pn_driver_now(d);
pn_connector_t *c = d->connector_head;
while (c) {
if (c->closed) {
@@ -760,6 +774,7 @@ void pn_driver_wait_3(pn_driver_t *d)
int idx = c->idx;
c->pending_read = (idx && d->fds[idx].revents & POLLIN);
c->pending_write = (idx && d->fds[idx].revents & POLLOUT);
+ c->pending_tick = (c->wakeup && c->wakeup <= now);
}
c = c->connector_next;
}
@@ -818,12 +833,3 @@ pn_connector_t *pn_driver_connector(pn_d
return NULL;
}
-
-
-pn_timestamp_t pn_driver_timestamp(pn_driver_t *driver)
-{
- struct timeval now;
- if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
-
- return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_usec / 1000);
-}
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1409360&r1=1409359&r2=1409360&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Wed Nov 14 20:11:48 2012
@@ -28,6 +28,7 @@
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
+#include <sys/time.h>
#include <uuid/uuid.h>
#include "util.h"
@@ -471,26 +472,21 @@ static long int millis(struct timeval tv
int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
{
- pn_timestamp_t ctor_tick = 0;
- pn_timestamp_t now = pn_driver_timestamp(messenger->driver);
- pn_timestamp_t deadline = (timeout < 0) ? 0 : now + timeout;
-
pn_connector_t *ctor = pn_connector_head(messenger->driver);
while (ctor) {
- ctor_tick = pn_timestamp_next_expire( pn_connector_process(ctor, now), ctor_tick );
+ pn_connector_process(ctor);
ctor = pn_connector_next(ctor);
}
+ struct timeval now;
+ if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
+ long int deadline = millis(now) + timeout;
bool pred;
- do {
- pred = predicate(messenger);
- if (pred) break;
- int remaining = -1;
- pn_timestamp_t next_deadline = pn_timestamp_next_expire(deadline, ctor_tick);
- if (next_deadline) {
- remaining = (next_deadline > now) ? next_deadline - now : 0;
- }
+ while (true) {
+ pred = predicate(messenger);
+ int remaining = deadline - millis(now);
+ if (pred || (timeout >= 0 && remaining < 0)) break;
int error = pn_driver_wait(messenger->driver, remaining);
if (error)
@@ -522,11 +518,9 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_connector_set_connection(c, conn);
}
- now = pn_driver_timestamp(messenger->driver);
- ctor_tick = 0;
pn_connector_t *c;
while ((c = pn_driver_connector(messenger->driver))) {
- (void) pn_connector_process(c, now); // timeout ignored - we call it again below
+ pn_connector_process(c);
pn_connection_t *conn = pn_connector_connection(c);
pn_messenger_endpoints(messenger, conn);
if (pn_connector_closed(c)) {
@@ -535,11 +529,14 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_decref(conn);
pn_messenger_flow(messenger);
} else {
- ctor_tick = pn_timestamp_next_expire( pn_connector_process(c, now), ctor_tick );
+ pn_connector_process(c);
}
}
- now = pn_driver_timestamp(messenger->driver);
- } while (!deadline || deadline > now);
+
+ if (timeout >= 0) {
+ if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
+ }
+ }
return pred ? 0 : PN_TIMEOUT;
}
Modified: qpid/proton/trunk/proton-c/src/proton.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/proton.c?rev=1409360&r1=1409359&r2=1409360&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/proton.c (original)
+++ qpid/proton/trunk/proton-c/src/proton.c Wed Nov 14 20:11:48 2012
@@ -453,10 +453,6 @@ int main(int argc, char **argv)
parse_url(url, &scheme, &user, &pass, &host, &port, &path);
- pn_timestamp_t c_deadline = 0;
- pn_timestamp_t now;
- int timeout;
-
pn_driver_t *drv = pn_driver();
if (url) {
struct client_context ctx = {false, false, count, count, drv, quiet, size, high, low};
@@ -469,36 +465,24 @@ int main(int argc, char **argv)
if (!ctor) pn_fatal("connector failed\n");
pn_connector_set_connection(ctor, pn_connection());
while (!ctx.done) {
- timeout = -1;
- now = pn_driver_timestamp(drv);
- if (c_deadline) {
- timeout = (c_deadline > now) ? c_deadline - now : 0;
- }
- pn_driver_wait(drv, timeout);
+ pn_driver_wait(drv, -1);
pn_connector_t *c;
while ((c = pn_driver_connector(drv))) {
- (void) pn_connector_process(c, now);
+ pn_connector_process(c);
client_callback(c);
if (pn_connector_closed(c)) {
pn_connection_free(pn_connector_connection(c));
pn_connector_free(c);
} else {
- c_deadline = pn_timestamp_next_expire(pn_connector_process(c, now),
- c_deadline);
+ pn_connector_process(c);
}
}
}
} else {
struct server_context ctx = {0, quiet, size};
if (!pn_listener(drv, host, port, &ctx)) pn_fatal("listener failed\n");
- pn_timestamp_t c_deadline = 0;
while (true) {
- timeout = -1;
- now = pn_driver_timestamp(drv);
- if (c_deadline) {
- timeout = (c_deadline > now) ? c_deadline - now : 0;
- }
- pn_driver_wait(drv, timeout);
+ pn_driver_wait(drv, -1);
pn_listener_t *l;
pn_connector_t *c;
@@ -508,14 +492,13 @@ int main(int argc, char **argv)
}
while ((c = pn_driver_connector(drv))) {
- (void) pn_connector_process(c, now);
+ pn_connector_process(c);
server_callback(c);
if (pn_connector_closed(c)) {
pn_connection_free(pn_connector_connection(c));
pn_connector_free(c);
} else {
- c_deadline = pn_timestamp_next_expire(pn_connector_process(c, now),
- c_deadline);
+ pn_connector_process(c);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org