You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/25 21:01:30 UTC
[34/48] qpid-proton git commit: PROTON-1344: proactor timeout support
PROTON-1344: proactor timeout support
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f2c8a3a3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f2c8a3a3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f2c8a3a3
Branch: refs/heads/go1
Commit: f2c8a3a38439c12d52dd93aed9173280c5754c9e
Parents: 25706a4
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Nov 16 23:59:24 2016 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Nov 17 11:22:50 2016 -0500
----------------------------------------------------------------------
examples/c/proactor/libuv_proactor.c | 36 +++++++++++++++++++-
examples/c/proactor/send.c | 55 +++++++++++++++++++++++--------
examples/c/proactor/test.py | 8 +++++
3 files changed, 84 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2c8a3a3/examples/c/proactor/libuv_proactor.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c
index 35afd5c..a26c311 100644
--- a/examples/c/proactor/libuv_proactor.c
+++ b/examples/c/proactor/libuv_proactor.c
@@ -157,6 +157,7 @@ struct pn_proactor_t {
uv_cond_t cond;
uv_loop_t loop;
uv_async_t async;
+ uv_timer_t timer;
/* Owner thread: proactor collector and batch can belong to leader or a worker */
pn_collector_t *collector;
@@ -168,8 +169,11 @@ struct pn_proactor_t {
queue worker_q;
queue leader_q;
size_t interrupt; /* pending interrupts */
+ pn_millis_t timeout;
size_t count; /* psocket count */
bool inactive:1;
+ bool timeout_request:1;
+ bool timeout_elapsed:1;
bool has_leader:1;
bool batch_working:1; /* batch belongs to a worker. */
};
@@ -551,6 +555,13 @@ static void on_write(uv_write_t* write, int err) {
pc->writing = 0; /* Need to send a new write request */
}
+static void on_timeout(uv_timer_t *timer) {
+ pn_proactor_t *p = (pn_proactor_t*)timer->data;
+ uv_mutex_lock(&p->lock);
+ p->timeout_elapsed = true;
+ uv_mutex_unlock(&p->lock);
+}
+
// Read buffer allocation function for uv, just returns the transports read buffer.
static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
pconnection_t *pc = (pconnection_t*)stream->data;
@@ -587,6 +598,7 @@ static void leader_rewatch(psocket_t *ps) {
}
}
+/* Set the event in the proactor's batch */
static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
pn_collector_put(p->collector, pn_proactor__class(), p, t);
p->batch_working = true;
@@ -604,6 +616,10 @@ static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
--p->interrupt;
return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
}
+ if (p->timeout_elapsed) {
+ p->timeout_elapsed = false;
+ return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT);
+ }
}
for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
if (ps->is_conn) {
@@ -676,6 +692,14 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
/* Lead till there is work to do. */
p->has_leader = true;
while (batch == NULL) {
+ if (p->timeout_request) {
+ p->timeout_request = false;
+ if (p->timeout) {
+ uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
+ } else {
+ uv_timer_stop(&p->timer);
+ }
+ }
for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
void (*action)(psocket_t*) = ps->action;
void (*wakeup)(psocket_t*) = ps->wakeup;
@@ -710,6 +734,14 @@ void pn_proactor_interrupt(pn_proactor_t *p) {
uv_mutex_unlock(&p->lock);
}
+void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
+ uv_mutex_lock(&p->lock);
+ p->timeout = t;
+ p->timeout_request = true;
+ uv_async_send(&p->async); /* Interrupt the UV loop */
+ uv_mutex_unlock(&p->lock);
+}
+
int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn_bytes_t extra) {
pconnection_t *pc = new_pconnection_t(p, false, host, port, extra);
if (!pc) {
@@ -765,7 +797,9 @@ pn_proactor_t *pn_proactor() {
uv_loop_init(&p->loop);
uv_mutex_init(&p->lock);
uv_cond_init(&p->cond);
- uv_async_init(&p->loop, &p->async, NULL); /* Just wake the loop */
+ uv_async_init(&p->loop, &p->async, NULL);
+ uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */
+ p->timer.data = p;
return p;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2c8a3a3/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index d64ea2d..42facb0 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -44,6 +44,9 @@ typedef struct app_data_t {
int sent;
int acknowledged;
pn_proactor_t *proactor;
+ pn_millis_t delay;
+ bool delaying;
+ pn_link_t *sender;
bool finished;
} app_data_t;
@@ -91,6 +94,23 @@ static pn_bytes_t encode_message(app_data_t* app) {
return pn_bytes(mbuf.size, mbuf.start);
}
+static void send(app_data_t* app) {
+ while (pn_link_credit(app->sender) > 0 && app->sent < app->message_count) {
+ ++app->sent;
+ // Use sent counter bytes as unique delivery tag.
+ pn_delivery(app->sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+ pn_bytes_t msgbuf = encode_message(app);
+ pn_link_send(app->sender, msgbuf.start, msgbuf.size);
+ pn_link_advance(app->sender);
+ if (app->delay && app->sent < app->message_count) {
+ /* If delay is set, wait for TIMEOUT event to send more */
+ app->delaying = true;
+ pn_proactor_set_timeout(app->proactor, app->delay);
+ break;
+ }
+ }
+}
+
static void handle(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
@@ -105,18 +125,24 @@ static void handle(app_data_t* app, pn_event_t* event) {
pn_link_open(l);
} break;
- case PN_LINK_FLOW: {
- /* The peer has given us some credit, now we can send messages */
- pn_link_t *sender = pn_event_link(event);
- while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
- ++app->sent;
- // Use sent counter bytes as unique delivery tag.
- pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
- pn_bytes_t msgbuf = encode_message(app);
- pn_link_send(sender, msgbuf.start, msgbuf.size);
- pn_link_advance(sender);
- }
- } break;
+ case PN_LINK_FLOW:
+ /* The peer has given us some credit, now we can send messages */
+ if (!app->delaying) {
+ app->sender = pn_event_link(event);
+ send(app);
+ }
+ break;
+
+ case PN_PROACTOR_TIMEOUT:
+ /* Wake the sender's connection */
+ pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
+ break;
+
+ case PN_CONNECTION_WAKE:
+ /* Timeout, we can send more. */
+ app->delaying = false;
+ send(app);
+ break;
case PN_DELIVERY: {
/* We received acknowledgedment from the peer that a message was delivered. */
@@ -158,7 +184,7 @@ static void handle(app_data_t* app, pn_event_t* event) {
}
static void usage(const char *arg0) {
- fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+ fprintf(stderr, "Usage: %s [-a url] [-m message-count] [-d delay-ms]\n", arg0);
exit(1);
}
@@ -169,10 +195,11 @@ int main(int argc, char **argv) {
const char* urlstr = NULL;
int opt;
- while((opt = getopt(argc, argv, "a:m:")) != -1) {
+ while((opt = getopt(argc, argv, "a:m:d:")) != -1) {
switch(opt) {
case 'a': urlstr = optarg; break;
case 'm': app.message_count = atoi(optarg); break;
+ case 'd': app.delay = atoi(optarg); break;
default: usage(argv[0]); break;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2c8a3a3/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
index 5dc3a99..a86425d 100644
--- a/examples/c/proactor/test.py
+++ b/examples/c/proactor/test.py
@@ -48,5 +48,13 @@ class CExampleTest(BrokerTestCase):
self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
self.assertEqual(receive_expect(100), r.wait_out())
+ def test_timed_send(self):
+ """Send with timed delay"""
+ s = self.proc(["libuv_send", "-a", self.addr, "-d100", "-m3"])
+ self.assertEqual("3 messages sent and acknowledged\n", s.wait_out())
+ r = self.proc(["libuv_receive", "-a", self.addr, "-m3"])
+ self.assertEqual(receive_expect(3), r.wait_out())
+
+
if __name__ == "__main__":
unittest.main()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org