You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/11/14 21:11:39 UTC

svn commit: r1409359 - in /qpid/proton/trunk: proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/ proton-c/src/dispatcher/ proton-c/src/engine/ proton-j/src/main/scripts/ tests/proton_tests/

Author: kgiusti
Date: Wed Nov 14 20:11:37 2012
New Revision: 1409359

URL: http://svn.apache.org/viewvc?rev=1409359&view=rev
Log:
PROTON-111: first pass at a solution

Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/driver.h
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/include/proton/types.h
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
    qpid/proton/trunk/proton-c/src/driver.c
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/messenger.c
    qpid/proton/trunk/proton-c/src/proton.c
    qpid/proton/trunk/proton-c/src/util.c
    qpid/proton/trunk/proton-c/src/util.h
    qpid/proton/trunk/proton-j/src/main/scripts/proton.py
    qpid/proton/trunk/tests/proton_tests/engine.py

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Wed Nov 14 20:11:37 2012
@@ -2140,6 +2140,7 @@ class Transport(object):
     else:
       return self._check(n)
 
+  # AMQP 1.0 max-frame-size
   def _get_max_frame_size(self):
     return pn_transport_get_max_frame(self._trans)
 
@@ -2155,6 +2156,30 @@ Sets the maximum size for received frame
   def remote_max_frame_size(self):
     return pn_transport_get_remote_max_frame(self._trans)
 
+  # AMQP 1.0 idle-time-out
+  def _get_idle_timeout(self):
+    return pn_transport_get_idle_timeout(self._trans)
+
+  def _set_idle_timeout(self, value):
+    pn_transport_set_idle_timeout(self._trans, value)
+
+  idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
+                          doc="""
+The idle timeout of the connection (in milliseconds).
+""")
+
+  @property
+  def remote_idle_timeout(self):
+    return pn_transport_get_remote_idle_timeout(self._trans)
+
+  @property
+  def frames_output(self):
+    return pn_transport_get_frames_output(self._trans)
+
+  @property
+  def frames_input(self):
+    return pn_transport_get_frames_input(self._trans)
+
 class SASLException(TransportException):
   pass
 

Modified: qpid/proton/trunk/proton-c/include/proton/driver.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/driver.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/driver.h Wed Nov 14 20:11:37 2012
@@ -121,6 +121,15 @@ pn_listener_t *pn_driver_listener(pn_dri
  */
 pn_connector_t *pn_driver_connector(pn_driver_t *driver);
 
+/** Get the current time as an AMQP timestamp
+ *
+ * Returns the current local time in AMQP 1.0 format: milliseconds since Unix Epoch.
+ *
+ * @param[in] driver the driver
+ * @return timestamp
+ */
+pn_timestamp_t pn_driver_timestamp(pn_driver_t *driver);
+
 /** Free the driver allocated via pn_driver, and all associated
  *  listeners and connectors.
  *
@@ -261,12 +270,17 @@ void pn_connector_trace(pn_connector_t *
 
 /** Service the given connector.
  *
- * Handle any inbound data, outbound data, or timing events pending on
- * the connector.
+ * Handle any inbound data, outbound data, or timing events pending on the connector.  If
+ * there are any pending timeouts that will need to be processed in the future, then
+ * return the deadline for the next outstanding timeout.
  *
  * @param[in] connector the connector to process.
+ * @param[in] now the current time in msec offset from Unix Epoch (see AMQP 1.0)
+ *
+ * @return if non-zero: a deadline - pn_connector_process must be invoked again
+ * for the given connector at least once before this deadline expires.
  */
-void pn_connector_process(pn_connector_t *connector);
+pn_timestamp_t pn_connector_process(pn_connector_t *connector, pn_timestamp_t now);
 
 /** Access the listener which opened this connector.
  *

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Wed Nov 14 20:11:37 2012
@@ -275,12 +275,31 @@ void pn_connection_set_context(pn_connec
 pn_error_t *pn_transport_error(pn_transport_t *transport);
 ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t available);
 ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size);
-time_t pn_transport_tick(pn_transport_t *transport, time_t now);
+/** Process any pending transport timer events.
+ *
+ * This method should be called after all pending input has been processed by the
+ * transport (see ::pn_transport_input), and before generating output (see
+ * ::pn_transport_output).  It returns the deadline for the next pending timer event, if
+ * any are present.
+ *
+ * @param[in] transport the transport to process.
+ *
+ * @return if non-zero, then the expiration time of the next pending timer event for the
+ * transport.  The caller must invoke pn_transport_tick again prior to the expiration of
+ * this deadline.
+ */
+pn_timestamp_t pn_transport_tick(pn_transport_t *transport, pn_timestamp_t now);
 void pn_transport_trace(pn_transport_t *transport, pn_trace_t trace);
 // max frame of zero means "unlimited"
 uint32_t pn_transport_get_max_frame(pn_transport_t *transport);
 void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size);
 uint32_t pn_transport_get_remote_max_frame(pn_transport_t *transport);
+/* timeout of zero means "no timeout" */
+pn_millis_t pn_transport_get_idle_timeout(pn_transport_t *transport);
+void pn_transport_set_idle_timeout(pn_transport_t *transport, pn_millis_t timeout);
+pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport);
+uint64_t pn_transport_get_frames_output(const pn_transport_t *transport);
+uint64_t pn_transport_get_frames_input(const pn_transport_t *transport);
 void pn_transport_free(pn_transport_t *transport);
 
 // session

Modified: qpid/proton/trunk/proton-c/include/proton/types.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/types.h?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/types.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/types.h Wed Nov 14 20:11:37 2012
@@ -32,7 +32,7 @@ extern "C" {
 typedef int32_t  pn_sequence_t;
 typedef uint32_t pn_millis_t;
 typedef uint32_t pn_seconds_t;
-typedef uint64_t pn_timestamp_t;
+typedef int64_t  pn_timestamp_t;
 typedef uint32_t pn_char_t;
 typedef uint32_t pn_decimal32_t;
 typedef uint64_t pn_decimal64_t;

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Wed Nov 14 20:11:37 2012
@@ -115,6 +115,12 @@ void pn_dispatcher_trace(pn_dispatcher_t
 
 int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame)
 {
+  if (frame.size == 0) { // ignore null frames
+    if (disp->trace & PN_TRACE_FRM)
+      pn_dispatcher_trace(disp, frame.channel, "<- (EMPTY FRAME)\n");
+    return 0;
+  }
+
   ssize_t dsize = pn_data_decode(disp->args, frame.payload, frame.size);
   if (dsize < 0) {
     fprintf(stderr, "Error decoding frame: %s %s\n", pn_code(dsize),
@@ -182,6 +188,7 @@ ssize_t pn_dispatcher_input(pn_dispatche
 
     size_t n = pn_read_frame(&frame, bytes + read, available - read);
     if (n) {
+      disp->input_frames_ct += 1;
       int e = pn_dispatch_frame(disp, frame);
       if (e) return e;
       read += n;
@@ -272,6 +279,7 @@ int pn_post_frame(pn_dispatcher_t *disp,
     disp->capacity *= 2;
     disp->output = (char *) realloc(disp->output, disp->capacity);
   }
+  disp->output_frames_ct += 1;
   if (disp->trace & PN_TRACE_RAW) {
     fprintf(stderr, "RAW: \"");
     pn_fprint_data(stderr, disp->output + disp->available, n);
@@ -374,6 +382,7 @@ int pn_post_transfer_frame(pn_dispatcher
       disp->capacity *= 2;
       disp->output = realloc(disp->output, disp->capacity);
     }
+    disp->output_frames_ct += 1;
     if (disp->trace & PN_TRACE_RAW) {
       fprintf(stderr, "RAW: \"");
       pn_fprint_data(stderr, disp->output + disp->available, n);
@@ -385,4 +394,3 @@ int pn_post_transfer_frame(pn_dispatcher
   disp->output_payload = NULL;
   return 0;
 }
-

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Wed Nov 14 20:11:37 2012
@@ -52,11 +52,13 @@ struct pn_dispatcher_t {
   size_t remote_max_frame;
   pn_buffer_t *frame;  // frame under construction
   size_t capacity;
-  size_t available;
+  size_t available; /* number of raw bytes pending output */
   char *output;
   void *context;
   bool halt;
   bool batch;
+  uint64_t output_frames_ct;
+  uint64_t input_frames_ct;
   char scratch[SCRATCH];
 };
 

Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Wed Nov 14 20:11:37 2012
@@ -30,11 +30,13 @@
 #include <netdb.h>
 #include <unistd.h>
 #include <fcntl.h>
+#include <sys/time.h>
 
 #include <proton/driver.h>
 #include <proton/error.h>
 #include <proton/sasl.h>
 #include <proton/ssl.h>
+#include <proton/util.h>
 #include "util.h"
 #include "ssl/ssl-internal.h"
 
@@ -90,7 +92,6 @@ struct pn_connector_t {
   time_t wakeup;
   void (*read)(pn_connector_t *);
   void (*write) (pn_connector_t *);
-  time_t (*tick)(pn_connector_t *sel, time_t now);
   size_t input_size;
   char input[IO_BUF_SIZE];
   bool input_eos;
@@ -342,7 +343,6 @@ pn_connector_t *pn_connector(pn_driver_t
 
 static void pn_connector_read(pn_connector_t *ctor);
 static void pn_connector_write(pn_connector_t *ctor);
-static time_t pn_connector_tick(pn_connector_t *ctor, time_t now);
 
 pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
 {
@@ -365,7 +365,6 @@ pn_connector_t *pn_connector_fd(pn_drive
   c->wakeup = 0;
   c->read = pn_connector_read;
   c->write = pn_connector_write;
-  c->tick = pn_connector_tick;
   c->input_size = 0;
   c->input_eos = false;
   c->output_size = 0;
@@ -570,31 +569,28 @@ static void pn_connector_write(pn_connec
     ctor->status &= ~PN_SEL_WR;
 }
 
-static time_t pn_connector_tick(pn_connector_t *ctor, time_t now)
+static pn_timestamp_t pn_connector_tick(pn_connector_t *ctor, time_t now)
 {
   if (!ctor->transport) return 0;
-  // XXX: should probably have a function pointer for this and switch it with different layers
-  time_t result = pn_transport_tick(ctor->transport, now);
-  pn_connector_process_input(ctor);
-  pn_connector_process_output(ctor);
-  return result;
+  return pn_transport_tick(ctor->transport, now);
 }
 
-void pn_connector_process(pn_connector_t *c) {
-  if (c) {
-    if (c->closed) return;
+pn_timestamp_t pn_connector_process(pn_connector_t *c, pn_timestamp_t now)
+{
+  pn_timestamp_t next_timeout = 0;
 
-    if (c->pending_tick) {
-      // XXX: should handle timing also
-      c->tick(c, 0);
-      c->pending_tick = false;
-    }
+  if (c) {
+    if (c->closed) return 0;
 
     if (c->pending_read) {
       c->read(c);
       c->pending_read = false;
     }
     pn_connector_process_input(c);
+
+    next_timeout = pn_connector_tick(c, now);
+    c->pending_tick = (next_timeout != 0);
+
     pn_connector_process_output(c);
     if (c->pending_write) {
       c->write(c);
@@ -606,8 +602,10 @@ void pn_connector_process(pn_connector_t
         fprintf(stderr, "Closed %s\n", c->name);
       }
       pn_connector_close(c);
+      return 0;
     }
   }
+  return next_timeout;
 }
 
 // driver
@@ -820,3 +818,12 @@ pn_connector_t *pn_driver_connector(pn_d
 
   return NULL;
 }
+
+
+pn_timestamp_t pn_driver_timestamp(pn_driver_t *driver)
+{
+  struct timeval now;
+  if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
+
+  return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_usec / 1000);
+}

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Wed Nov 14 20:11:37 2012
@@ -99,6 +99,7 @@ typedef struct {
 struct pn_transport_t {
   ssize_t (*process_input)(pn_transport_t *, const char *, size_t);
   ssize_t (*process_output)(pn_transport_t *, char *, size_t);
+  pn_timestamp_t (*process_tick)(pn_transport_t *, pn_timestamp_t);
   size_t header_count;
   pn_sasl_t *sasl;
   pn_ssl_t *ssl;
@@ -114,6 +115,17 @@ struct pn_transport_t {
   pn_data_t *remote_desired_capabilities;
   uint32_t   local_max_frame;
   uint32_t   remote_max_frame;
+
+  /* dead remote detection */
+  pn_millis_t local_idle_timeout;
+  pn_timestamp_t dead_remote_deadline;
+  uint64_t last_bytes_input;
+
+  /* keepalive */
+  pn_millis_t remote_idle_timeout;
+  pn_timestamp_t keepalive_deadline;
+  uint64_t last_bytes_output;
+
   pn_error_t *error;
   pn_session_state_t *sessions;
   size_t session_capacity;
@@ -121,6 +133,10 @@ struct pn_transport_t {
   size_t channel_capacity;
   const char *condition;
   char scratch[SCRATCH];
+
+  /* statistics */
+  uint64_t bytes_input;
+  uint64_t bytes_output;
 };
 
 struct pn_connection_t {

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Wed Nov 14 20:11:37 2012
@@ -735,11 +735,13 @@ static ssize_t pn_output_write_sasl_head
 static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t available);
 static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t available);
 static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t available);
+static pn_timestamp_t pn_process_tick(pn_transport_t *transport, pn_timestamp_t now);
 
 void pn_transport_init(pn_transport_t *transport)
 {
   transport->process_input = pn_input_read_amqp_header;
   transport->process_output = pn_output_write_amqp_header;
+  transport->process_tick = NULL;
   transport->header_count = 0;
   transport->sasl = NULL;
   transport->ssl = NULL;
@@ -763,6 +765,12 @@ void pn_transport_init(pn_transport_t *t
   transport->remote_hostname = NULL;
   transport->local_max_frame = 0;
   transport->remote_max_frame = 0;
+  transport->local_idle_timeout = 0;
+  transport->dead_remote_deadline = 0;
+  transport->last_bytes_input = 0;
+  transport->remote_idle_timeout = 0;
+  transport->keepalive_deadline = 0;
+  transport->last_bytes_output = 0;
   transport->remote_offered_capabilities = pn_data(16);
   transport->remote_desired_capabilities = pn_data(16);
   transport->error = pn_error();
@@ -774,6 +782,9 @@ void pn_transport_init(pn_transport_t *t
   transport->channel_capacity = 0;
 
   transport->condition = NULL;
+
+  transport->bytes_input = 0;
+  transport->bytes_output = 0;
 }
 
 pn_session_state_t *pn_session_get_state(pn_transport_t *transport, pn_session_t *ssn)
@@ -1331,9 +1342,10 @@ int pn_do_open(pn_dispatcher_t *disp)
   pn_bytes_t remote_container, remote_hostname;
   pn_data_clear(transport->remote_offered_capabilities);
   pn_data_clear(transport->remote_desired_capabilities);
-  int err = pn_scan_args(disp, "D.[?S?SI....CC]", &container_q,
+  int err = pn_scan_args(disp, "D.[?S?SI.I..CC]", &container_q,
                          &remote_container, &hostname_q, &remote_hostname,
                          &transport->remote_max_frame,
+                         &transport->remote_idle_timeout,
                          transport->remote_offered_capabilities,
                          transport->remote_desired_capabilities);
   if (err) return err;
@@ -1362,6 +1374,8 @@ int pn_do_open(pn_dispatcher_t *disp)
   } else {
     transport->disp->halt = true;
   }
+  if (transport->remote_idle_timeout)
+    transport->process_tick = pn_process_tick;  // enable timeouts
   transport->open_rcvd = true;
   return 0;
 }
@@ -1758,6 +1772,7 @@ ssize_t pn_transport_input(pn_transport_
     }
   }
 
+  transport->bytes_input += consumed;
   return consumed;
 }
 
@@ -1839,6 +1854,43 @@ static ssize_t pn_input_read_amqp(pn_tra
   }
 }
 
+/* process AMQP related timer events */
+static pn_timestamp_t pn_process_tick(pn_transport_t *transport, pn_timestamp_t now)
+{
+  pn_timestamp_t timeout = 0;
+
+  if (transport->local_idle_timeout) {
+    if (transport->dead_remote_deadline == 0 ||
+        transport->last_bytes_input != transport->bytes_input) {
+      transport->dead_remote_deadline = now + transport->local_idle_timeout;
+      transport->last_bytes_input = transport->bytes_input;
+    } else if (transport->dead_remote_deadline <= now) {
+      transport->dead_remote_deadline = now + transport->local_idle_timeout;
+      // Note: AMQP-1.0 really should define a generic "timeout" error, but does not.
+      pn_do_error(transport, "amqp:resource-limit-exceeded", "local-idle-timeout expired");
+    }
+    timeout = transport->dead_remote_deadline;
+  }
+
+  // Prevent remote idle timeout as describe by AMQP 1.0:
+  if (transport->remote_idle_timeout && !transport->close_sent) {
+    if (transport->keepalive_deadline == 0 ||
+        transport->last_bytes_output != transport->bytes_output) {
+        transport->keepalive_deadline = now + (transport->remote_idle_timeout/2.0);
+        transport->last_bytes_output = transport->bytes_output;
+      } else if (transport->keepalive_deadline <= now) {
+        transport->keepalive_deadline = now + (transport->remote_idle_timeout/2.0);
+        if (transport->disp->available == 0) {    // no outbound data ready
+          pn_post_frame(transport->disp, 0, "");  // so send empty frame
+          transport->last_bytes_output += 8;      // and account for it!
+      }
+    }
+    timeout = pn_timestamp_next_expire( timeout, transport->keepalive_deadline );
+  }
+
+  return timeout;
+}
+
 bool pn_delivery_buffered(pn_delivery_t *delivery)
 {
   if (delivery->settled) return false;
@@ -1861,11 +1913,12 @@ int pn_process_conn_setup(pn_transport_t
     if (!(endpoint->state & PN_LOCAL_UNINIT) && !transport->open_sent)
     {
       pn_connection_t *connection = (pn_connection_t *) endpoint;
-      int err = pn_post_frame(transport->disp, 0, "DL[SS?InnnnCC]", OPEN,
+      int err = pn_post_frame(transport->disp, 0, "DL[SS?In?InnCC]", OPEN,
                               connection->container,
                               connection->hostname,
-                              // if not zero, advertise our max frame size
+                              // if not zero, advertise our max frame size and idle timeout
                               (bool)transport->local_max_frame, transport->local_max_frame,
+                              (bool)transport->local_idle_timeout, transport->local_idle_timeout,
                               connection->offered_capabilities,
                               connection->desired_capabilities);
       if (err) return err;
@@ -2421,7 +2474,7 @@ ssize_t pn_transport_output(pn_transport
       break;
     } else if (n == PN_EOS) {
       if (total > 0) {
-        return total;
+        break;
       } else {
         if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
           pn_dispatcher_trace(transport->disp, 0, "-> EOS\n");
@@ -2435,6 +2488,7 @@ ssize_t pn_transport_output(pn_transport
     }
   }
 
+  transport->bytes_output += total;
   return total;
 }
 
@@ -2462,6 +2516,22 @@ uint32_t pn_transport_get_remote_max_fra
   return transport->remote_max_frame;
 }
 
+pn_millis_t pn_transport_get_idle_timeout(pn_transport_t *transport)
+{
+  return transport->local_idle_timeout;
+}
+
+void pn_transport_set_idle_timeout(pn_transport_t *transport, pn_millis_t timeout)
+{
+  transport->local_idle_timeout = timeout;
+  transport->process_tick = pn_process_tick;
+}
+
+pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport)
+{
+  return transport->remote_idle_timeout;
+}
+
 void pn_link_offered(pn_link_t *sender, int credit)
 {
   sender->available = credit;
@@ -2520,8 +2590,24 @@ void pn_link_drain(pn_link_t *receiver, 
   }
 }
 
-time_t pn_transport_tick(pn_transport_t *engine, time_t now)
+pn_timestamp_t pn_transport_tick(pn_transport_t *transport, pn_timestamp_t now)
+{
+  if (transport && transport->process_tick)
+    return transport->process_tick( transport, now );
+  return 0;
+}
+
+uint64_t pn_transport_get_frames_output(const pn_transport_t *transport)
+{
+  if (transport && transport->disp)
+    return transport->disp->output_frames_ct;
+  return 0;
+}
+
+uint64_t pn_transport_get_frames_input(const pn_transport_t *transport)
 {
+  if (transport && transport->disp)
+    return transport->disp->input_frames_ct;
   return 0;
 }
 

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Wed Nov 14 20:11:37 2012
@@ -28,7 +28,6 @@
 #include <stdlib.h>
 #include <string.h>
 #include <stdio.h>
-#include <sys/time.h>
 #include <uuid/uuid.h>
 #include "util.h"
 
@@ -472,21 +471,26 @@ static long int millis(struct timeval tv
 
 int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
 {
+  pn_timestamp_t ctor_tick = 0;
+  pn_timestamp_t now = pn_driver_timestamp(messenger->driver);
+  pn_timestamp_t deadline = (timeout < 0) ? 0 : now + timeout;
+
   pn_connector_t *ctor = pn_connector_head(messenger->driver);
   while (ctor) {
-    pn_connector_process(ctor);
+    ctor_tick = pn_timestamp_next_expire( pn_connector_process(ctor, now), ctor_tick );
     ctor = pn_connector_next(ctor);
   }
 
-  struct timeval now;
-  if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
-  long int deadline = millis(now) + timeout;
   bool pred;
-
-  while (true) {
+  do {
     pred = predicate(messenger);
-    int remaining = deadline - millis(now);
-    if (pred || (timeout >= 0 && remaining < 0)) break;
+    if (pred) break;
+
+    int remaining = -1;
+    pn_timestamp_t next_deadline = pn_timestamp_next_expire(deadline, ctor_tick);
+    if (next_deadline) {
+      remaining = (next_deadline > now) ? next_deadline - now : 0;
+    }
 
     int error = pn_driver_wait(messenger->driver, remaining);
     if (error)
@@ -518,9 +522,11 @@ int pn_messenger_tsync(pn_messenger_t *m
       pn_connector_set_connection(c, conn);
     }
 
+    now = pn_driver_timestamp(messenger->driver);
+    ctor_tick = 0;
     pn_connector_t *c;
     while ((c = pn_driver_connector(messenger->driver))) {
-      pn_connector_process(c);
+      (void) pn_connector_process(c, now);  // timeout ignored - we call it again below
       pn_connection_t *conn = pn_connector_connection(c);
       pn_messenger_endpoints(messenger, conn);
       if (pn_connector_closed(c)) {
@@ -529,14 +535,11 @@ int pn_messenger_tsync(pn_messenger_t *m
         pn_decref(conn);
         pn_messenger_flow(messenger);
       } else {
-        pn_connector_process(c);
+        ctor_tick = pn_timestamp_next_expire( pn_connector_process(c, now), ctor_tick );
       }
     }
-
-    if (timeout >= 0) {
-      if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
-    }
-  }
+    now = pn_driver_timestamp(messenger->driver);
+  } while (!deadline || deadline > now);
 
   return pred ? 0 : PN_TIMEOUT;
 }

Modified: qpid/proton/trunk/proton-c/src/proton.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/proton.c?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/proton.c (original)
+++ qpid/proton/trunk/proton-c/src/proton.c Wed Nov 14 20:11:37 2012
@@ -453,6 +453,10 @@ int main(int argc, char **argv)
 
   parse_url(url, &scheme, &user, &pass, &host, &port, &path);
 
+  pn_timestamp_t c_deadline = 0;
+  pn_timestamp_t now;
+  int timeout;
+
   pn_driver_t *drv = pn_driver();
   if (url) {
     struct client_context ctx = {false, false, count, count, drv, quiet, size, high, low};
@@ -465,24 +469,36 @@ int main(int argc, char **argv)
     if (!ctor) pn_fatal("connector failed\n");
     pn_connector_set_connection(ctor, pn_connection());
     while (!ctx.done) {
-      pn_driver_wait(drv, -1);
+      timeout = -1;
+      now = pn_driver_timestamp(drv);
+      if (c_deadline) {
+        timeout = (c_deadline > now) ? c_deadline - now : 0;
+      }
+      pn_driver_wait(drv, timeout);
       pn_connector_t *c;
       while ((c = pn_driver_connector(drv))) {
-        pn_connector_process(c);
+        (void) pn_connector_process(c, now);
         client_callback(c);
         if (pn_connector_closed(c)) {
 	  pn_connection_free(pn_connector_connection(c));
           pn_connector_free(c);
         } else {
-          pn_connector_process(c);
+          c_deadline = pn_timestamp_next_expire(pn_connector_process(c, now),
+                                                c_deadline);
         }
       }
     }
   } else {
     struct server_context ctx = {0, quiet, size};
     if (!pn_listener(drv, host, port, &ctx)) pn_fatal("listener failed\n");
+    pn_timestamp_t c_deadline = 0;
     while (true) {
-      pn_driver_wait(drv, -1);
+      timeout = -1;
+      now = pn_driver_timestamp(drv);
+      if (c_deadline) {
+        timeout = (c_deadline > now) ? c_deadline - now : 0;
+      }
+      pn_driver_wait(drv, timeout);
       pn_listener_t *l;
       pn_connector_t *c;
 
@@ -492,13 +508,14 @@ int main(int argc, char **argv)
       }
 
       while ((c = pn_driver_connector(drv))) {
-        pn_connector_process(c);
+        (void) pn_connector_process(c, now);
         server_callback(c);
         if (pn_connector_closed(c)) {
 	  pn_connection_free(pn_connector_connection(c));
           pn_connector_free(c);
         } else {
-          pn_connector_process(c);
+          c_deadline = pn_timestamp_next_expire(pn_connector_process(c, now),
+                                                c_deadline);
         }
       }
     }

Modified: qpid/proton/trunk/proton-c/src/util.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/util.c?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/util.c (original)
+++ qpid/proton/trunk/proton-c/src/util.c Wed Nov 14 20:11:37 2012
@@ -30,6 +30,8 @@
 #include <strings.h> // For non C89/C99 strcasecmp
 #include <proton/error.h>
 #include <proton/util.h>
+#include <proton/types.h>
+#include "util.h"
 
 ssize_t pn_quote_data(char *dst, size_t capacity, const char *src, size_t size)
 {
@@ -165,3 +167,12 @@ char *pn_strndup(const char *src, size_t
     return NULL;
   }
 }
+
+// which timestamp will expire next, or zero if none set
+pn_timestamp_t pn_timestamp_next_expire( pn_timestamp_t a, pn_timestamp_t b )
+{
+  if (a && b) return pn_min(a, b);
+  if (a) return a;
+  return b;
+}
+

Modified: qpid/proton/trunk/proton-c/src/util.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/util.h?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/util.h (original)
+++ qpid/proton/trunk/proton-c/src/util.h Wed Nov 14 20:11:37 2012
@@ -28,11 +28,13 @@
 #include <stdlib.h>
 #include <string.h>
 #include <sys/types.h>
+#include <proton/types.h>
 
 ssize_t pn_quote_data(char *dst, size_t capacity, const char *src, size_t size);
 void pn_fprint_data(FILE *stream, const char *bytes, size_t size);
 void pn_print_data(const char *bytes, size_t size);
 bool pn_env_bool(const char *name);
+pn_timestamp_t pn_timestamp_next_expire(pn_timestamp_t a, pn_timestamp_t b);
 
 #define DIE_IFR(EXPR, STRERR)                                           \
   do {                                                                  \

Modified: qpid/proton/trunk/proton-j/src/main/scripts/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/scripts/proton.py?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/scripts/proton.py (original)
+++ qpid/proton/trunk/proton-j/src/main/scripts/proton.py Wed Nov 14 20:11:37 2012
@@ -412,6 +412,35 @@ Sets the maximum size for received frame
     #return pn_transport_get_remote_max_frame(self._trans)
     raise Skipped()
 
+  # AMQP 1.0 idle-time-out
+  def _get_idle_timeout(self):
+    #return pn_transport_get_idle_timeout(self._trans)
+    raise Skipped()
+
+  def _set_idle_timeout(self, value):
+    #pn_transport_set_idle_timeout(self._trans, value)
+    raise Skipped()
+
+  idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
+                          doc="""
+The idle timeout of the connection (in milliseconds).
+""")
+
+  @property
+  def remote_idle_timeout(self):
+    #return pn_transport_get_remote_idle_timeout(self._trans)
+    raise Skipped()
+
+  @property
+  def frames_output(self):
+    #return pn_transport_get_frames_output(self._trans)
+    raise Skipped()
+
+  @property
+  def frames_input(self):
+    #return pn_transport_get_frames_input(self._trans)
+    raise Skipped()
+
 class Data(object):
 
   SYMBOL = None

Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Wed Nov 14 20:11:37 2012
@@ -67,11 +67,14 @@ class Test(common.Test):
       t2.trace(Transport.TRACE_FRM)
     return c1, c2
 
-  def link(self, name, max_frame=None):
+  def link(self, name, max_frame=None, idle_timeout=None):
     c1, c2 = self.connection()
     if max_frame:
       c1._transport.max_frame_size = max_frame[0]
       c2._transport.max_frame_size = max_frame[1]
+    if idle_timeout:
+      c1._transport.idle_timeout = idle_timeout[0]
+      c2._transport.idle_timeout = idle_timeout[1]
     c1.open()
     c2.open()
     ssn1 = c1.session()
@@ -654,6 +657,113 @@ class MaxFrameTransferTest(Test):
     bytes = self.rcv.recv(1024)
     assert bytes == None
 
+class IdleTimeoutTest(Test):
+
+  def setup(self):
+    pass
+
+  def teardown(self):
+    self.cleanup()
+
+  def message(self, size):
+    parts = []
+    for i in range(size):
+      parts.append(str(i))
+    return "/".join(parts)[:size]
+
+  def testDefaults(self):
+    """
+    Verify the default value of the Connection idle timeout.
+    """
+
+    self.snd, self.rcv = self.link("test-link")
+    self.c1 = self.snd.session.connection
+    self.c2 = self.rcv.session.connection
+    self.snd.open()
+    self.rcv.open()
+    self.pump()
+    assert self.rcv.session.connection._transport.idle_timeout == 0
+    assert self.snd.session.connection._transport.idle_timeout == 0
+
+  def testGetSet(self):
+    """
+    Verify the configuration and negotiation of the idle timeout.
+    """
+
+    self.snd, self.rcv = self.link("test-link", idle_timeout=[1000,2000])
+    self.c1 = self.snd.session.connection
+    self.c2 = self.rcv.session.connection
+    self.snd.open()
+    self.rcv.open()
+    self.pump()
+    assert self.rcv.session.connection._transport.idle_timeout == 2000
+    assert self.rcv.session.connection._transport.remote_idle_timeout == 1000
+    assert self.snd.session.connection._transport.idle_timeout == 1000
+    assert self.snd.session.connection._transport.remote_idle_timeout == 2000
+
+  def testTimeout(self):
+    """
+    Verify the AMQP Connection idle timeout.
+    """
+
+    # snd will timeout the Connection if no frame is received within 1000 ticks
+    self.snd, self.rcv = self.link("test-link", idle_timeout=[1000,0])
+    self.c1 = self.snd.session.connection
+    self.c2 = self.rcv.session.connection
+    self.snd.open()
+    self.rcv.open()
+    self.pump()
+
+    t_snd = self.snd.session.connection._transport
+    t_rcv = self.rcv.session.connection._transport
+    assert t_rcv.idle_timeout == 0
+    assert t_rcv.remote_idle_timeout == 1000
+    assert t_snd.idle_timeout == 1000
+    assert t_snd.remote_idle_timeout == 0
+
+    sndr_frames_in = t_snd.frames_input
+    rcvr_frames_out = t_rcv.frames_output
+
+    # at t+1, nothing should happen:
+    clock = 1
+    assert t_snd.tick(clock) == 1001, "deadline for remote timeout"
+    assert t_rcv.tick(clock) == 501,  "deadline to send keepalive"
+    self.pump()
+    assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"
+
+    # at one tick from expected idle frame send, nothing should happen:
+    clock = 500
+    assert t_snd.tick(clock) == 1001, "deadline for remote timeout"
+    assert t_rcv.tick(clock) == 501,  "deadline to send keepalive"
+    self.pump()
+    assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"
+
+    # this should cause rcvr to expire and send a keepalive
+    clock = 502
+    assert t_snd.tick(clock) == 1001, "deadline for remote timeout"
+    assert t_rcv.tick(clock) == 1002, "deadline to send keepalive"
+    self.pump()
+    sndr_frames_in += 1
+    rcvr_frames_out += 1
+    assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"
+    assert rcvr_frames_out == t_rcv.frames_output, "unexpected frame"
+
+    # since a keepalive was received, sndr will rebase its clock against this tick:
+    # and the receiver should not change its deadline
+    clock = 503
+    assert t_snd.tick(clock) == 1503, "deadline for remote timeout"
+    assert t_rcv.tick(clock) == 1002, "deadline to send keepalive"
+    self.pump()
+    assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"
+
+    # now expire sndr
+    clock = 1504
+    t_snd.tick(clock)
+    try:
+      self.pump()
+      assert False, "Expected connection timeout did not happen!"
+    except TransportException:
+      pass
 
 class CreditTest(Test):
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org