You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/11/14 21:11:39 UTC
svn commit: r1409359 - in /qpid/proton/trunk: proton-c/bindings/python/
proton-c/include/proton/ proton-c/src/ proton-c/src/dispatcher/
proton-c/src/engine/ proton-j/src/main/scripts/ tests/proton_tests/
Author: kgiusti
Date: Wed Nov 14 20:11:37 2012
New Revision: 1409359
URL: http://svn.apache.org/viewvc?rev=1409359&view=rev
Log:
PROTON-111: first pass at a solution
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/driver.h
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/include/proton/types.h
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
qpid/proton/trunk/proton-c/src/driver.c
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/proton-c/src/proton.c
qpid/proton/trunk/proton-c/src/util.c
qpid/proton/trunk/proton-c/src/util.h
qpid/proton/trunk/proton-j/src/main/scripts/proton.py
qpid/proton/trunk/tests/proton_tests/engine.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Wed Nov 14 20:11:37 2012
@@ -2140,6 +2140,7 @@ class Transport(object):
else:
return self._check(n)
+ # AMQP 1.0 max-frame-size
def _get_max_frame_size(self):
return pn_transport_get_max_frame(self._trans)
@@ -2155,6 +2156,30 @@ Sets the maximum size for received frame
def remote_max_frame_size(self):
return pn_transport_get_remote_max_frame(self._trans)
+ # AMQP 1.0 idle-time-out
+ def _get_idle_timeout(self):
+ return pn_transport_get_idle_timeout(self._trans)
+
+ def _set_idle_timeout(self, value):
+ pn_transport_set_idle_timeout(self._trans, value)
+
+ idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
+ doc="""
+The idle timeout of the connection (in milliseconds).
+""")
+
+ @property
+ def remote_idle_timeout(self):
+ return pn_transport_get_remote_idle_timeout(self._trans)
+
+ @property
+ def frames_output(self):
+ return pn_transport_get_frames_output(self._trans)
+
+ @property
+ def frames_input(self):
+ return pn_transport_get_frames_input(self._trans)
+
class SASLException(TransportException):
pass
Modified: qpid/proton/trunk/proton-c/include/proton/driver.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/driver.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/driver.h Wed Nov 14 20:11:37 2012
@@ -121,6 +121,15 @@ pn_listener_t *pn_driver_listener(pn_dri
*/
pn_connector_t *pn_driver_connector(pn_driver_t *driver);
+/** Get the current time as an AMQP timestamp
+ *
+ * Returns the current local time in AMQP 1.0 format: milliseconds since Unix Epoch.
+ *
+ * @param[in] driver the driver
+ * @return timestamp
+ */
+pn_timestamp_t pn_driver_timestamp(pn_driver_t *driver);
+
/** Free the driver allocated via pn_driver, and all associated
* listeners and connectors.
*
@@ -261,12 +270,17 @@ void pn_connector_trace(pn_connector_t *
/** Service the given connector.
*
- * Handle any inbound data, outbound data, or timing events pending on
- * the connector.
+ * Handle any inbound data, outbound data, or timing events pending on the connector. If
+ * there are any pending timeouts that will need to be processed in the future, then
+ * return the deadline for the next outstanding timeout.
*
* @param[in] connector the connector to process.
+ * @param[in] now the current time in msec offset from Unix Epoch (see AMQP 1.0)
+ *
+ * @return if non-zero: a deadline - pn_connector_process must be invoked again
+ * for the given connector at least once before this deadline expires.
*/
-void pn_connector_process(pn_connector_t *connector);
+pn_timestamp_t pn_connector_process(pn_connector_t *connector, pn_timestamp_t now);
/** Access the listener which opened this connector.
*
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Wed Nov 14 20:11:37 2012
@@ -275,12 +275,31 @@ void pn_connection_set_context(pn_connec
pn_error_t *pn_transport_error(pn_transport_t *transport);
ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t available);
ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size);
-time_t pn_transport_tick(pn_transport_t *transport, time_t now);
+/** Process any pending transport timer events.
+ *
+ * This method should be called after all pending input has been processed by the
+ * transport (see ::pn_transport_input), and before generating output (see
+ * ::pn_transport_output). It returns the deadline for the next pending timer event, if
+ * any are present.
+ *
+ * @param[in] transport the transport to process.
+ *
+ * @return if non-zero, then the expiration time of the next pending timer event for the
+ * transport. The caller must invoke pn_transport_tick again prior to the expiration of
+ * this deadline.
+ */
+pn_timestamp_t pn_transport_tick(pn_transport_t *transport, pn_timestamp_t now);
void pn_transport_trace(pn_transport_t *transport, pn_trace_t trace);
// max frame of zero means "unlimited"
uint32_t pn_transport_get_max_frame(pn_transport_t *transport);
void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size);
uint32_t pn_transport_get_remote_max_frame(pn_transport_t *transport);
+/* timeout of zero means "no timeout" */
+pn_millis_t pn_transport_get_idle_timeout(pn_transport_t *transport);
+void pn_transport_set_idle_timeout(pn_transport_t *transport, pn_millis_t timeout);
+pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport);
+uint64_t pn_transport_get_frames_output(const pn_transport_t *transport);
+uint64_t pn_transport_get_frames_input(const pn_transport_t *transport);
void pn_transport_free(pn_transport_t *transport);
// session
Modified: qpid/proton/trunk/proton-c/include/proton/types.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/types.h?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/types.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/types.h Wed Nov 14 20:11:37 2012
@@ -32,7 +32,7 @@ extern "C" {
typedef int32_t pn_sequence_t;
typedef uint32_t pn_millis_t;
typedef uint32_t pn_seconds_t;
-typedef uint64_t pn_timestamp_t;
+typedef int64_t pn_timestamp_t;
typedef uint32_t pn_char_t;
typedef uint32_t pn_decimal32_t;
typedef uint64_t pn_decimal64_t;
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Wed Nov 14 20:11:37 2012
@@ -115,6 +115,12 @@ void pn_dispatcher_trace(pn_dispatcher_t
int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame)
{
+ if (frame.size == 0) { // ignore null frames
+ if (disp->trace & PN_TRACE_FRM)
+ pn_dispatcher_trace(disp, frame.channel, "<- (EMPTY FRAME)\n");
+ return 0;
+ }
+
ssize_t dsize = pn_data_decode(disp->args, frame.payload, frame.size);
if (dsize < 0) {
fprintf(stderr, "Error decoding frame: %s %s\n", pn_code(dsize),
@@ -182,6 +188,7 @@ ssize_t pn_dispatcher_input(pn_dispatche
size_t n = pn_read_frame(&frame, bytes + read, available - read);
if (n) {
+ disp->input_frames_ct += 1;
int e = pn_dispatch_frame(disp, frame);
if (e) return e;
read += n;
@@ -272,6 +279,7 @@ int pn_post_frame(pn_dispatcher_t *disp,
disp->capacity *= 2;
disp->output = (char *) realloc(disp->output, disp->capacity);
}
+ disp->output_frames_ct += 1;
if (disp->trace & PN_TRACE_RAW) {
fprintf(stderr, "RAW: \"");
pn_fprint_data(stderr, disp->output + disp->available, n);
@@ -374,6 +382,7 @@ int pn_post_transfer_frame(pn_dispatcher
disp->capacity *= 2;
disp->output = realloc(disp->output, disp->capacity);
}
+ disp->output_frames_ct += 1;
if (disp->trace & PN_TRACE_RAW) {
fprintf(stderr, "RAW: \"");
pn_fprint_data(stderr, disp->output + disp->available, n);
@@ -385,4 +394,3 @@ int pn_post_transfer_frame(pn_dispatcher
disp->output_payload = NULL;
return 0;
}
-
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Wed Nov 14 20:11:37 2012
@@ -52,11 +52,13 @@ struct pn_dispatcher_t {
size_t remote_max_frame;
pn_buffer_t *frame; // frame under construction
size_t capacity;
- size_t available;
+ size_t available; /* number of raw bytes pending output */
char *output;
void *context;
bool halt;
bool batch;
+ uint64_t output_frames_ct;
+ uint64_t input_frames_ct;
char scratch[SCRATCH];
};
Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Wed Nov 14 20:11:37 2012
@@ -30,11 +30,13 @@
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
+#include <sys/time.h>
#include <proton/driver.h>
#include <proton/error.h>
#include <proton/sasl.h>
#include <proton/ssl.h>
+#include <proton/util.h>
#include "util.h"
#include "ssl/ssl-internal.h"
@@ -90,7 +92,6 @@ struct pn_connector_t {
time_t wakeup;
void (*read)(pn_connector_t *);
void (*write) (pn_connector_t *);
- time_t (*tick)(pn_connector_t *sel, time_t now);
size_t input_size;
char input[IO_BUF_SIZE];
bool input_eos;
@@ -342,7 +343,6 @@ pn_connector_t *pn_connector(pn_driver_t
static void pn_connector_read(pn_connector_t *ctor);
static void pn_connector_write(pn_connector_t *ctor);
-static time_t pn_connector_tick(pn_connector_t *ctor, time_t now);
pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
{
@@ -365,7 +365,6 @@ pn_connector_t *pn_connector_fd(pn_drive
c->wakeup = 0;
c->read = pn_connector_read;
c->write = pn_connector_write;
- c->tick = pn_connector_tick;
c->input_size = 0;
c->input_eos = false;
c->output_size = 0;
@@ -570,31 +569,28 @@ static void pn_connector_write(pn_connec
ctor->status &= ~PN_SEL_WR;
}
-static time_t pn_connector_tick(pn_connector_t *ctor, time_t now)
+static pn_timestamp_t pn_connector_tick(pn_connector_t *ctor, time_t now)
{
if (!ctor->transport) return 0;
- // XXX: should probably have a function pointer for this and switch it with different layers
- time_t result = pn_transport_tick(ctor->transport, now);
- pn_connector_process_input(ctor);
- pn_connector_process_output(ctor);
- return result;
+ return pn_transport_tick(ctor->transport, now);
}
-void pn_connector_process(pn_connector_t *c) {
- if (c) {
- if (c->closed) return;
+pn_timestamp_t pn_connector_process(pn_connector_t *c, pn_timestamp_t now)
+{
+ pn_timestamp_t next_timeout = 0;
- if (c->pending_tick) {
- // XXX: should handle timing also
- c->tick(c, 0);
- c->pending_tick = false;
- }
+ if (c) {
+ if (c->closed) return 0;
if (c->pending_read) {
c->read(c);
c->pending_read = false;
}
pn_connector_process_input(c);
+
+ next_timeout = pn_connector_tick(c, now);
+ c->pending_tick = (next_timeout != 0);
+
pn_connector_process_output(c);
if (c->pending_write) {
c->write(c);
@@ -606,8 +602,10 @@ void pn_connector_process(pn_connector_t
fprintf(stderr, "Closed %s\n", c->name);
}
pn_connector_close(c);
+ return 0;
}
}
+ return next_timeout;
}
// driver
@@ -820,3 +818,12 @@ pn_connector_t *pn_driver_connector(pn_d
return NULL;
}
+
+
+pn_timestamp_t pn_driver_timestamp(pn_driver_t *driver)
+{
+ struct timeval now;
+ if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
+
+ return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_usec / 1000);
+}
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Wed Nov 14 20:11:37 2012
@@ -99,6 +99,7 @@ typedef struct {
struct pn_transport_t {
ssize_t (*process_input)(pn_transport_t *, const char *, size_t);
ssize_t (*process_output)(pn_transport_t *, char *, size_t);
+ pn_timestamp_t (*process_tick)(pn_transport_t *, pn_timestamp_t);
size_t header_count;
pn_sasl_t *sasl;
pn_ssl_t *ssl;
@@ -114,6 +115,17 @@ struct pn_transport_t {
pn_data_t *remote_desired_capabilities;
uint32_t local_max_frame;
uint32_t remote_max_frame;
+
+ /* dead remote detection */
+ pn_millis_t local_idle_timeout;
+ pn_timestamp_t dead_remote_deadline;
+ uint64_t last_bytes_input;
+
+ /* keepalive */
+ pn_millis_t remote_idle_timeout;
+ pn_timestamp_t keepalive_deadline;
+ uint64_t last_bytes_output;
+
pn_error_t *error;
pn_session_state_t *sessions;
size_t session_capacity;
@@ -121,6 +133,10 @@ struct pn_transport_t {
size_t channel_capacity;
const char *condition;
char scratch[SCRATCH];
+
+ /* statistics */
+ uint64_t bytes_input;
+ uint64_t bytes_output;
};
struct pn_connection_t {
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Wed Nov 14 20:11:37 2012
@@ -735,11 +735,13 @@ static ssize_t pn_output_write_sasl_head
static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t available);
static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t available);
static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t available);
+static pn_timestamp_t pn_process_tick(pn_transport_t *transport, pn_timestamp_t now);
void pn_transport_init(pn_transport_t *transport)
{
transport->process_input = pn_input_read_amqp_header;
transport->process_output = pn_output_write_amqp_header;
+ transport->process_tick = NULL;
transport->header_count = 0;
transport->sasl = NULL;
transport->ssl = NULL;
@@ -763,6 +765,12 @@ void pn_transport_init(pn_transport_t *t
transport->remote_hostname = NULL;
transport->local_max_frame = 0;
transport->remote_max_frame = 0;
+ transport->local_idle_timeout = 0;
+ transport->dead_remote_deadline = 0;
+ transport->last_bytes_input = 0;
+ transport->remote_idle_timeout = 0;
+ transport->keepalive_deadline = 0;
+ transport->last_bytes_output = 0;
transport->remote_offered_capabilities = pn_data(16);
transport->remote_desired_capabilities = pn_data(16);
transport->error = pn_error();
@@ -774,6 +782,9 @@ void pn_transport_init(pn_transport_t *t
transport->channel_capacity = 0;
transport->condition = NULL;
+
+ transport->bytes_input = 0;
+ transport->bytes_output = 0;
}
pn_session_state_t *pn_session_get_state(pn_transport_t *transport, pn_session_t *ssn)
@@ -1331,9 +1342,10 @@ int pn_do_open(pn_dispatcher_t *disp)
pn_bytes_t remote_container, remote_hostname;
pn_data_clear(transport->remote_offered_capabilities);
pn_data_clear(transport->remote_desired_capabilities);
- int err = pn_scan_args(disp, "D.[?S?SI....CC]", &container_q,
+ int err = pn_scan_args(disp, "D.[?S?SI.I..CC]", &container_q,
&remote_container, &hostname_q, &remote_hostname,
&transport->remote_max_frame,
+ &transport->remote_idle_timeout,
transport->remote_offered_capabilities,
transport->remote_desired_capabilities);
if (err) return err;
@@ -1362,6 +1374,8 @@ int pn_do_open(pn_dispatcher_t *disp)
} else {
transport->disp->halt = true;
}
+ if (transport->remote_idle_timeout)
+ transport->process_tick = pn_process_tick; // enable timeouts
transport->open_rcvd = true;
return 0;
}
@@ -1758,6 +1772,7 @@ ssize_t pn_transport_input(pn_transport_
}
}
+ transport->bytes_input += consumed;
return consumed;
}
@@ -1839,6 +1854,43 @@ static ssize_t pn_input_read_amqp(pn_tra
}
}
+/* process AMQP related timer events */
+static pn_timestamp_t pn_process_tick(pn_transport_t *transport, pn_timestamp_t now)
+{
+ pn_timestamp_t timeout = 0;
+
+ if (transport->local_idle_timeout) {
+ if (transport->dead_remote_deadline == 0 ||
+ transport->last_bytes_input != transport->bytes_input) {
+ transport->dead_remote_deadline = now + transport->local_idle_timeout;
+ transport->last_bytes_input = transport->bytes_input;
+ } else if (transport->dead_remote_deadline <= now) {
+ transport->dead_remote_deadline = now + transport->local_idle_timeout;
+ // Note: AMQP-1.0 really should define a generic "timeout" error, but does not.
+ pn_do_error(transport, "amqp:resource-limit-exceeded", "local-idle-timeout expired");
+ }
+ timeout = transport->dead_remote_deadline;
+ }
+
+ // Prevent remote idle timeout as describe by AMQP 1.0:
+ if (transport->remote_idle_timeout && !transport->close_sent) {
+ if (transport->keepalive_deadline == 0 ||
+ transport->last_bytes_output != transport->bytes_output) {
+ transport->keepalive_deadline = now + (transport->remote_idle_timeout/2.0);
+ transport->last_bytes_output = transport->bytes_output;
+ } else if (transport->keepalive_deadline <= now) {
+ transport->keepalive_deadline = now + (transport->remote_idle_timeout/2.0);
+ if (transport->disp->available == 0) { // no outbound data ready
+ pn_post_frame(transport->disp, 0, ""); // so send empty frame
+ transport->last_bytes_output += 8; // and account for it!
+ }
+ }
+ timeout = pn_timestamp_next_expire( timeout, transport->keepalive_deadline );
+ }
+
+ return timeout;
+}
+
bool pn_delivery_buffered(pn_delivery_t *delivery)
{
if (delivery->settled) return false;
@@ -1861,11 +1913,12 @@ int pn_process_conn_setup(pn_transport_t
if (!(endpoint->state & PN_LOCAL_UNINIT) && !transport->open_sent)
{
pn_connection_t *connection = (pn_connection_t *) endpoint;
- int err = pn_post_frame(transport->disp, 0, "DL[SS?InnnnCC]", OPEN,
+ int err = pn_post_frame(transport->disp, 0, "DL[SS?In?InnCC]", OPEN,
connection->container,
connection->hostname,
- // if not zero, advertise our max frame size
+ // if not zero, advertise our max frame size and idle timeout
(bool)transport->local_max_frame, transport->local_max_frame,
+ (bool)transport->local_idle_timeout, transport->local_idle_timeout,
connection->offered_capabilities,
connection->desired_capabilities);
if (err) return err;
@@ -2421,7 +2474,7 @@ ssize_t pn_transport_output(pn_transport
break;
} else if (n == PN_EOS) {
if (total > 0) {
- return total;
+ break;
} else {
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
pn_dispatcher_trace(transport->disp, 0, "-> EOS\n");
@@ -2435,6 +2488,7 @@ ssize_t pn_transport_output(pn_transport
}
}
+ transport->bytes_output += total;
return total;
}
@@ -2462,6 +2516,22 @@ uint32_t pn_transport_get_remote_max_fra
return transport->remote_max_frame;
}
+pn_millis_t pn_transport_get_idle_timeout(pn_transport_t *transport)
+{
+ return transport->local_idle_timeout;
+}
+
+void pn_transport_set_idle_timeout(pn_transport_t *transport, pn_millis_t timeout)
+{
+ transport->local_idle_timeout = timeout;
+ transport->process_tick = pn_process_tick;
+}
+
+pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport)
+{
+ return transport->remote_idle_timeout;
+}
+
void pn_link_offered(pn_link_t *sender, int credit)
{
sender->available = credit;
@@ -2520,8 +2590,24 @@ void pn_link_drain(pn_link_t *receiver,
}
}
-time_t pn_transport_tick(pn_transport_t *engine, time_t now)
+pn_timestamp_t pn_transport_tick(pn_transport_t *transport, pn_timestamp_t now)
+{
+ if (transport && transport->process_tick)
+ return transport->process_tick( transport, now );
+ return 0;
+}
+
+uint64_t pn_transport_get_frames_output(const pn_transport_t *transport)
+{
+ if (transport && transport->disp)
+ return transport->disp->output_frames_ct;
+ return 0;
+}
+
+uint64_t pn_transport_get_frames_input(const pn_transport_t *transport)
{
+ if (transport && transport->disp)
+ return transport->disp->input_frames_ct;
return 0;
}
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Wed Nov 14 20:11:37 2012
@@ -28,7 +28,6 @@
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
-#include <sys/time.h>
#include <uuid/uuid.h>
#include "util.h"
@@ -472,21 +471,26 @@ static long int millis(struct timeval tv
int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
{
+ pn_timestamp_t ctor_tick = 0;
+ pn_timestamp_t now = pn_driver_timestamp(messenger->driver);
+ pn_timestamp_t deadline = (timeout < 0) ? 0 : now + timeout;
+
pn_connector_t *ctor = pn_connector_head(messenger->driver);
while (ctor) {
- pn_connector_process(ctor);
+ ctor_tick = pn_timestamp_next_expire( pn_connector_process(ctor, now), ctor_tick );
ctor = pn_connector_next(ctor);
}
- struct timeval now;
- if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
- long int deadline = millis(now) + timeout;
bool pred;
-
- while (true) {
+ do {
pred = predicate(messenger);
- int remaining = deadline - millis(now);
- if (pred || (timeout >= 0 && remaining < 0)) break;
+ if (pred) break;
+
+ int remaining = -1;
+ pn_timestamp_t next_deadline = pn_timestamp_next_expire(deadline, ctor_tick);
+ if (next_deadline) {
+ remaining = (next_deadline > now) ? next_deadline - now : 0;
+ }
int error = pn_driver_wait(messenger->driver, remaining);
if (error)
@@ -518,9 +522,11 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_connector_set_connection(c, conn);
}
+ now = pn_driver_timestamp(messenger->driver);
+ ctor_tick = 0;
pn_connector_t *c;
while ((c = pn_driver_connector(messenger->driver))) {
- pn_connector_process(c);
+ (void) pn_connector_process(c, now); // timeout ignored - we call it again below
pn_connection_t *conn = pn_connector_connection(c);
pn_messenger_endpoints(messenger, conn);
if (pn_connector_closed(c)) {
@@ -529,14 +535,11 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_decref(conn);
pn_messenger_flow(messenger);
} else {
- pn_connector_process(c);
+ ctor_tick = pn_timestamp_next_expire( pn_connector_process(c, now), ctor_tick );
}
}
-
- if (timeout >= 0) {
- if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
- }
- }
+ now = pn_driver_timestamp(messenger->driver);
+ } while (!deadline || deadline > now);
return pred ? 0 : PN_TIMEOUT;
}
Modified: qpid/proton/trunk/proton-c/src/proton.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/proton.c?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/proton.c (original)
+++ qpid/proton/trunk/proton-c/src/proton.c Wed Nov 14 20:11:37 2012
@@ -453,6 +453,10 @@ int main(int argc, char **argv)
parse_url(url, &scheme, &user, &pass, &host, &port, &path);
+ pn_timestamp_t c_deadline = 0;
+ pn_timestamp_t now;
+ int timeout;
+
pn_driver_t *drv = pn_driver();
if (url) {
struct client_context ctx = {false, false, count, count, drv, quiet, size, high, low};
@@ -465,24 +469,36 @@ int main(int argc, char **argv)
if (!ctor) pn_fatal("connector failed\n");
pn_connector_set_connection(ctor, pn_connection());
while (!ctx.done) {
- pn_driver_wait(drv, -1);
+ timeout = -1;
+ now = pn_driver_timestamp(drv);
+ if (c_deadline) {
+ timeout = (c_deadline > now) ? c_deadline - now : 0;
+ }
+ pn_driver_wait(drv, timeout);
pn_connector_t *c;
while ((c = pn_driver_connector(drv))) {
- pn_connector_process(c);
+ (void) pn_connector_process(c, now);
client_callback(c);
if (pn_connector_closed(c)) {
pn_connection_free(pn_connector_connection(c));
pn_connector_free(c);
} else {
- pn_connector_process(c);
+ c_deadline = pn_timestamp_next_expire(pn_connector_process(c, now),
+ c_deadline);
}
}
}
} else {
struct server_context ctx = {0, quiet, size};
if (!pn_listener(drv, host, port, &ctx)) pn_fatal("listener failed\n");
+ pn_timestamp_t c_deadline = 0;
while (true) {
- pn_driver_wait(drv, -1);
+ timeout = -1;
+ now = pn_driver_timestamp(drv);
+ if (c_deadline) {
+ timeout = (c_deadline > now) ? c_deadline - now : 0;
+ }
+ pn_driver_wait(drv, timeout);
pn_listener_t *l;
pn_connector_t *c;
@@ -492,13 +508,14 @@ int main(int argc, char **argv)
}
while ((c = pn_driver_connector(drv))) {
- pn_connector_process(c);
+ (void) pn_connector_process(c, now);
server_callback(c);
if (pn_connector_closed(c)) {
pn_connection_free(pn_connector_connection(c));
pn_connector_free(c);
} else {
- pn_connector_process(c);
+ c_deadline = pn_timestamp_next_expire(pn_connector_process(c, now),
+ c_deadline);
}
}
}
Modified: qpid/proton/trunk/proton-c/src/util.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/util.c?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/util.c (original)
+++ qpid/proton/trunk/proton-c/src/util.c Wed Nov 14 20:11:37 2012
@@ -30,6 +30,8 @@
#include <strings.h> // For non C89/C99 strcasecmp
#include <proton/error.h>
#include <proton/util.h>
+#include <proton/types.h>
+#include "util.h"
ssize_t pn_quote_data(char *dst, size_t capacity, const char *src, size_t size)
{
@@ -165,3 +167,12 @@ char *pn_strndup(const char *src, size_t
return NULL;
}
}
+
+// which timestamp will expire next, or zero if none set
+pn_timestamp_t pn_timestamp_next_expire( pn_timestamp_t a, pn_timestamp_t b )
+{
+ if (a && b) return pn_min(a, b);
+ if (a) return a;
+ return b;
+}
+
Modified: qpid/proton/trunk/proton-c/src/util.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/util.h?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/util.h (original)
+++ qpid/proton/trunk/proton-c/src/util.h Wed Nov 14 20:11:37 2012
@@ -28,11 +28,13 @@
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
+#include <proton/types.h>
ssize_t pn_quote_data(char *dst, size_t capacity, const char *src, size_t size);
void pn_fprint_data(FILE *stream, const char *bytes, size_t size);
void pn_print_data(const char *bytes, size_t size);
bool pn_env_bool(const char *name);
+pn_timestamp_t pn_timestamp_next_expire(pn_timestamp_t a, pn_timestamp_t b);
#define DIE_IFR(EXPR, STRERR) \
do { \
Modified: qpid/proton/trunk/proton-j/src/main/scripts/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/scripts/proton.py?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/scripts/proton.py (original)
+++ qpid/proton/trunk/proton-j/src/main/scripts/proton.py Wed Nov 14 20:11:37 2012
@@ -412,6 +412,35 @@ Sets the maximum size for received frame
#return pn_transport_get_remote_max_frame(self._trans)
raise Skipped()
+ # AMQP 1.0 idle-time-out
+ def _get_idle_timeout(self):
+ #return pn_transport_get_idle_timeout(self._trans)
+ raise Skipped()
+
+ def _set_idle_timeout(self, value):
+ #pn_transport_set_idle_timeout(self._trans, value)
+ raise Skipped()
+
+ idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
+ doc="""
+The idle timeout of the connection (in milliseconds).
+""")
+
+ @property
+ def remote_idle_timeout(self):
+ #return pn_transport_get_remote_idle_timeout(self._trans)
+ raise Skipped()
+
+ @property
+ def frames_output(self):
+ #return pn_transport_get_frames_output(self._trans)
+ raise Skipped()
+
+ @property
+ def frames_input(self):
+ #return pn_transport_get_frames_input(self._trans)
+ raise Skipped()
+
class Data(object):
SYMBOL = None
Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1409359&r1=1409358&r2=1409359&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Wed Nov 14 20:11:37 2012
@@ -67,11 +67,14 @@ class Test(common.Test):
t2.trace(Transport.TRACE_FRM)
return c1, c2
- def link(self, name, max_frame=None):
+ def link(self, name, max_frame=None, idle_timeout=None):
c1, c2 = self.connection()
if max_frame:
c1._transport.max_frame_size = max_frame[0]
c2._transport.max_frame_size = max_frame[1]
+ if idle_timeout:
+ c1._transport.idle_timeout = idle_timeout[0]
+ c2._transport.idle_timeout = idle_timeout[1]
c1.open()
c2.open()
ssn1 = c1.session()
@@ -654,6 +657,113 @@ class MaxFrameTransferTest(Test):
bytes = self.rcv.recv(1024)
assert bytes == None
+class IdleTimeoutTest(Test):
+
+ def setup(self):
+ pass
+
+ def teardown(self):
+ self.cleanup()
+
+ def message(self, size):
+ parts = []
+ for i in range(size):
+ parts.append(str(i))
+ return "/".join(parts)[:size]
+
+ def testDefaults(self):
+ """
+ Verify the default value of the Connection idle timeout.
+ """
+
+ self.snd, self.rcv = self.link("test-link")
+ self.c1 = self.snd.session.connection
+ self.c2 = self.rcv.session.connection
+ self.snd.open()
+ self.rcv.open()
+ self.pump()
+ assert self.rcv.session.connection._transport.idle_timeout == 0
+ assert self.snd.session.connection._transport.idle_timeout == 0
+
+ def testGetSet(self):
+ """
+ Verify the configuration and negotiation of the idle timeout.
+ """
+
+ self.snd, self.rcv = self.link("test-link", idle_timeout=[1000,2000])
+ self.c1 = self.snd.session.connection
+ self.c2 = self.rcv.session.connection
+ self.snd.open()
+ self.rcv.open()
+ self.pump()
+ assert self.rcv.session.connection._transport.idle_timeout == 2000
+ assert self.rcv.session.connection._transport.remote_idle_timeout == 1000
+ assert self.snd.session.connection._transport.idle_timeout == 1000
+ assert self.snd.session.connection._transport.remote_idle_timeout == 2000
+
+ def testTimeout(self):
+ """
+ Verify the AMQP Connection idle timeout.
+ """
+
+ # snd will timeout the Connection if no frame is received within 1000 ticks
+ self.snd, self.rcv = self.link("test-link", idle_timeout=[1000,0])
+ self.c1 = self.snd.session.connection
+ self.c2 = self.rcv.session.connection
+ self.snd.open()
+ self.rcv.open()
+ self.pump()
+
+ t_snd = self.snd.session.connection._transport
+ t_rcv = self.rcv.session.connection._transport
+ assert t_rcv.idle_timeout == 0
+ assert t_rcv.remote_idle_timeout == 1000
+ assert t_snd.idle_timeout == 1000
+ assert t_snd.remote_idle_timeout == 0
+
+ sndr_frames_in = t_snd.frames_input
+ rcvr_frames_out = t_rcv.frames_output
+
+ # at t+1, nothing should happen:
+ clock = 1
+ assert t_snd.tick(clock) == 1001, "deadline for remote timeout"
+ assert t_rcv.tick(clock) == 501, "deadline to send keepalive"
+ self.pump()
+ assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"
+
+ # at one tick from expected idle frame send, nothing should happen:
+ clock = 500
+ assert t_snd.tick(clock) == 1001, "deadline for remote timeout"
+ assert t_rcv.tick(clock) == 501, "deadline to send keepalive"
+ self.pump()
+ assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"
+
+ # this should cause rcvr to expire and send a keepalive
+ clock = 502
+ assert t_snd.tick(clock) == 1001, "deadline for remote timeout"
+ assert t_rcv.tick(clock) == 1002, "deadline to send keepalive"
+ self.pump()
+ sndr_frames_in += 1
+ rcvr_frames_out += 1
+ assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"
+ assert rcvr_frames_out == t_rcv.frames_output, "unexpected frame"
+
+ # since a keepalive was received, sndr will rebase its clock against this tick:
+ # and the receiver should not change its deadline
+ clock = 503
+ assert t_snd.tick(clock) == 1503, "deadline for remote timeout"
+ assert t_rcv.tick(clock) == 1002, "deadline to send keepalive"
+ self.pump()
+ assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"
+
+ # now expire sndr
+ clock = 1504
+ t_snd.tick(clock)
+ try:
+ self.pump()
+ assert False, "Expected connection timeout did not happen!"
+ except TransportException:
+ pass
class CreditTest(Test):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org