You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/11/14 21:11:49 UTC

svn commit: r1409360 - in /qpid/proton/trunk/proton-c: include/proton/driver.h src/driver.c src/messenger.c src/proton.c

Author: kgiusti
Date: Wed Nov 14 20:11:48 2012
New Revision: 1409360

URL: http://svn.apache.org/viewvc?rev=1409360&view=rev
Log:
PROTON-111: keep timeout handling within the context of the driver

Modified:
    qpid/proton/trunk/proton-c/include/proton/driver.h
    qpid/proton/trunk/proton-c/src/driver.c
    qpid/proton/trunk/proton-c/src/messenger.c
    qpid/proton/trunk/proton-c/src/proton.c

Modified: qpid/proton/trunk/proton-c/include/proton/driver.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1409360&r1=1409359&r2=1409360&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/driver.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/driver.h Wed Nov 14 20:11:48 2012
@@ -121,15 +121,6 @@ pn_listener_t *pn_driver_listener(pn_dri
  */
 pn_connector_t *pn_driver_connector(pn_driver_t *driver);
 
-/** Get the current time as an AMQP timestamp
- *
- * Returns the current local time in AMQP 1.0 format: milliseconds since Unix Epoch.
- *
- * @param[in] driver the driver
- * @return timestamp
- */
-pn_timestamp_t pn_driver_timestamp(pn_driver_t *driver);
-
 /** Free the driver allocated via pn_driver, and all associated
  *  listeners and connectors.
  *
@@ -270,17 +261,12 @@ void pn_connector_trace(pn_connector_t *
 
 /** Service the given connector.
  *
- * Handle any inbound data, outbound data, or timing events pending on the connector.  If
- * there are any pending timeouts that will need to be processed in the future, then
- * return the deadline for the next outstanding timeout.
+ * Handle any inbound data, outbound data, or timing events pending on
+ * the connector.
  *
  * @param[in] connector the connector to process.
- * @param[in] now the current time in msec offset from Unix Epoch (see AMQP 1.0)
- *
- * @return if non-zero: a deadline - pn_connector_process must be invoked again
- * for the given connector at least once before this deadline expires.
  */
-pn_timestamp_t pn_connector_process(pn_connector_t *connector, pn_timestamp_t now);
+void pn_connector_process(pn_connector_t *connector);
 
 /** Access the listener which opened this connector.
  *

Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1409360&r1=1409359&r2=1409360&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Wed Nov 14 20:11:48 2012
@@ -61,6 +61,7 @@ struct pn_driver_t {
   size_t nfds;
   int ctrl[2]; //pipe for updating selectable status
   pn_trace_t trace;
+  pn_timestamp_t wakeup;
 };
 
 struct pn_listener_t {
@@ -89,7 +90,7 @@ struct pn_connector_t {
   int status;
   pn_trace_t trace;
   bool closed;
-  time_t wakeup;
+  pn_timestamp_t wakeup;
   void (*read)(pn_connector_t *);
   void (*write) (pn_connector_t *);
   size_t input_size;
@@ -108,6 +109,8 @@ struct pn_connector_t {
 
 /* Impls */
 
+static pn_timestamp_t pn_driver_now(pn_driver_t *driver);
+
 // listener
 
 static void pn_driver_add_listener(pn_driver_t *d, pn_listener_t *l)
@@ -575,12 +578,10 @@ static pn_timestamp_t pn_connector_tick(
   return pn_transport_tick(ctor->transport, now);
 }
 
-pn_timestamp_t pn_connector_process(pn_connector_t *c, pn_timestamp_t now)
+void pn_connector_process(pn_connector_t *c)
 {
-  pn_timestamp_t next_timeout = 0;
-
   if (c) {
-    if (c->closed) return 0;
+    if (c->closed) return;
 
     if (c->pending_read) {
       c->read(c);
@@ -588,8 +589,7 @@ pn_timestamp_t pn_connector_process(pn_c
     }
     pn_connector_process_input(c);
 
-    next_timeout = pn_connector_tick(c, now);
-    c->pending_tick = (next_timeout != 0);
+    c->wakeup = pn_connector_tick(c, pn_driver_now(c->driver));
 
     pn_connector_process_output(c);
     if (c->pending_write) {
@@ -602,14 +602,20 @@ pn_timestamp_t pn_connector_process(pn_c
         fprintf(stderr, "Closed %s\n", c->name);
       }
       pn_connector_close(c);
-      return 0;
     }
   }
-  return next_timeout;
 }
 
 // driver
 
+static pn_timestamp_t pn_driver_now(pn_driver_t *driver)
+{
+  struct timeval now;
+  if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
+
+  return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_usec / 1000);
+}
+
 pn_driver_t *pn_driver()
 {
   pn_driver_t *d = (pn_driver_t *) malloc(sizeof(pn_driver_t));
@@ -632,6 +638,7 @@ pn_driver_t *pn_driver()
   d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
               (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
               (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF));
+  d->wakeup = 0;
 
   // XXX
   if (pipe(d->ctrl)) {
@@ -693,6 +700,7 @@ static void pn_driver_rebuild(pn_driver_
     d->fds = (struct pollfd *) realloc(d->fds, d->capacity*sizeof(struct pollfd));
   }
 
+  d->wakeup = 0;
   d->nfds = 0;
 
   d->fds[d->nfds].fd = d->ctrl[0];
@@ -714,6 +722,7 @@ static void pn_driver_rebuild(pn_driver_
   for (int i = 0; i < d->connector_count; i++)
   {
     if (!c->closed) {
+      d->wakeup = pn_timestamp_next_expire(d->wakeup, c->wakeup);
       d->fds[d->nfds].fd = c->fd;
       d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) | (c->status & PN_SEL_WR ? POLLOUT : 0);
       d->fds[d->nfds].revents = 0;
@@ -731,9 +740,13 @@ void pn_driver_wait_1(pn_driver_t *d)
 
 int pn_driver_wait_2(pn_driver_t *d, int timeout)
 {
-    if (poll(d->fds, d->nfds, d->closed_count > 0 ? 0 : timeout) == -1)
-        return pn_error_from_errno(d->error, "poll");
-    return 0;
+  if (d->wakeup) {
+    pn_timestamp_t now = pn_driver_now(d);
+    timeout = (now >= d->wakeup) ? 0 : pn_min(timeout, d->wakeup - now);
+  }
+  if (poll(d->fds, d->nfds, d->closed_count > 0 ? 0 : timeout) == -1)
+    return pn_error_from_errno(d->error, "poll");
+  return 0;
 }
 
 void pn_driver_wait_3(pn_driver_t *d)
@@ -750,6 +763,7 @@ void pn_driver_wait_3(pn_driver_t *d)
     l = l->listener_next;
   }
 
+  pn_timestamp_t now = pn_driver_now(d);
   pn_connector_t *c = d->connector_head;
   while (c) {
     if (c->closed) {
@@ -760,6 +774,7 @@ void pn_driver_wait_3(pn_driver_t *d)
       int idx = c->idx;
       c->pending_read = (idx && d->fds[idx].revents & POLLIN);
       c->pending_write = (idx && d->fds[idx].revents & POLLOUT);
+      c->pending_tick = (c->wakeup &&  c->wakeup <= now);
     }
     c = c->connector_next;
   }
@@ -818,12 +833,3 @@ pn_connector_t *pn_driver_connector(pn_d
 
   return NULL;
 }
-
-
-pn_timestamp_t pn_driver_timestamp(pn_driver_t *driver)
-{
-  struct timeval now;
-  if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
-
-  return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_usec / 1000);
-}

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1409360&r1=1409359&r2=1409360&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Wed Nov 14 20:11:48 2012
@@ -28,6 +28,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <stdio.h>
+#include <sys/time.h>
 #include <uuid/uuid.h>
 #include "util.h"
 
@@ -471,26 +472,21 @@ static long int millis(struct timeval tv
 
 int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
 {
-  pn_timestamp_t ctor_tick = 0;
-  pn_timestamp_t now = pn_driver_timestamp(messenger->driver);
-  pn_timestamp_t deadline = (timeout < 0) ? 0 : now + timeout;
-
   pn_connector_t *ctor = pn_connector_head(messenger->driver);
   while (ctor) {
-    ctor_tick = pn_timestamp_next_expire( pn_connector_process(ctor, now), ctor_tick );
+    pn_connector_process(ctor);
     ctor = pn_connector_next(ctor);
   }
 
+  struct timeval now;
+  if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
+  long int deadline = millis(now) + timeout;
   bool pred;
-  do {
-    pred = predicate(messenger);
-    if (pred) break;
 
-    int remaining = -1;
-    pn_timestamp_t next_deadline = pn_timestamp_next_expire(deadline, ctor_tick);
-    if (next_deadline) {
-      remaining = (next_deadline > now) ? next_deadline - now : 0;
-    }
+  while (true) {
+    pred = predicate(messenger);
+    int remaining = deadline - millis(now);
+    if (pred || (timeout >= 0 && remaining < 0)) break;
 
     int error = pn_driver_wait(messenger->driver, remaining);
     if (error)
@@ -522,11 +518,9 @@ int pn_messenger_tsync(pn_messenger_t *m
       pn_connector_set_connection(c, conn);
     }
 
-    now = pn_driver_timestamp(messenger->driver);
-    ctor_tick = 0;
     pn_connector_t *c;
     while ((c = pn_driver_connector(messenger->driver))) {
-      (void) pn_connector_process(c, now);  // timeout ignored - we call it again below
+      pn_connector_process(c);
       pn_connection_t *conn = pn_connector_connection(c);
       pn_messenger_endpoints(messenger, conn);
       if (pn_connector_closed(c)) {
@@ -535,11 +529,14 @@ int pn_messenger_tsync(pn_messenger_t *m
         pn_decref(conn);
         pn_messenger_flow(messenger);
       } else {
-        ctor_tick = pn_timestamp_next_expire( pn_connector_process(c, now), ctor_tick );
+        pn_connector_process(c);
       }
     }
-    now = pn_driver_timestamp(messenger->driver);
-  } while (!deadline || deadline > now);
+
+    if (timeout >= 0) {
+      if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
+    }
+  }
 
   return pred ? 0 : PN_TIMEOUT;
 }

Modified: qpid/proton/trunk/proton-c/src/proton.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/proton.c?rev=1409360&r1=1409359&r2=1409360&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/proton.c (original)
+++ qpid/proton/trunk/proton-c/src/proton.c Wed Nov 14 20:11:48 2012
@@ -453,10 +453,6 @@ int main(int argc, char **argv)
 
   parse_url(url, &scheme, &user, &pass, &host, &port, &path);
 
-  pn_timestamp_t c_deadline = 0;
-  pn_timestamp_t now;
-  int timeout;
-
   pn_driver_t *drv = pn_driver();
   if (url) {
     struct client_context ctx = {false, false, count, count, drv, quiet, size, high, low};
@@ -469,36 +465,24 @@ int main(int argc, char **argv)
     if (!ctor) pn_fatal("connector failed\n");
     pn_connector_set_connection(ctor, pn_connection());
     while (!ctx.done) {
-      timeout = -1;
-      now = pn_driver_timestamp(drv);
-      if (c_deadline) {
-        timeout = (c_deadline > now) ? c_deadline - now : 0;
-      }
-      pn_driver_wait(drv, timeout);
+      pn_driver_wait(drv, -1);
       pn_connector_t *c;
       while ((c = pn_driver_connector(drv))) {
-        (void) pn_connector_process(c, now);
+        pn_connector_process(c);
         client_callback(c);
         if (pn_connector_closed(c)) {
 	  pn_connection_free(pn_connector_connection(c));
           pn_connector_free(c);
         } else {
-          c_deadline = pn_timestamp_next_expire(pn_connector_process(c, now),
-                                                c_deadline);
+          pn_connector_process(c);
         }
       }
     }
   } else {
     struct server_context ctx = {0, quiet, size};
     if (!pn_listener(drv, host, port, &ctx)) pn_fatal("listener failed\n");
-    pn_timestamp_t c_deadline = 0;
     while (true) {
-      timeout = -1;
-      now = pn_driver_timestamp(drv);
-      if (c_deadline) {
-        timeout = (c_deadline > now) ? c_deadline - now : 0;
-      }
-      pn_driver_wait(drv, timeout);
+      pn_driver_wait(drv, -1);
       pn_listener_t *l;
       pn_connector_t *c;
 
@@ -508,14 +492,13 @@ int main(int argc, char **argv)
       }
 
       while ((c = pn_driver_connector(drv))) {
-        (void) pn_connector_process(c, now);
+        pn_connector_process(c);
         server_callback(c);
         if (pn_connector_closed(c)) {
 	  pn_connection_free(pn_connector_connection(c));
           pn_connector_free(c);
         } else {
-          c_deadline = pn_timestamp_next_expire(pn_connector_process(c, now),
-                                                c_deadline);
+          pn_connector_process(c);
         }
       }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org