You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/25 21:01:30 UTC

[34/48] qpid-proton git commit: PROTON-1344: proactor timeout support

PROTON-1344: proactor timeout support


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f2c8a3a3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f2c8a3a3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f2c8a3a3

Branch: refs/heads/go1
Commit: f2c8a3a38439c12d52dd93aed9173280c5754c9e
Parents: 25706a4
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Nov 16 23:59:24 2016 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Nov 17 11:22:50 2016 -0500

----------------------------------------------------------------------
 examples/c/proactor/libuv_proactor.c | 36 +++++++++++++++++++-
 examples/c/proactor/send.c           | 55 +++++++++++++++++++++++--------
 examples/c/proactor/test.py          |  8 +++++
 3 files changed, 84 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2c8a3a3/examples/c/proactor/libuv_proactor.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c
index 35afd5c..a26c311 100644
--- a/examples/c/proactor/libuv_proactor.c
+++ b/examples/c/proactor/libuv_proactor.c
@@ -157,6 +157,7 @@ struct pn_proactor_t {
   uv_cond_t cond;
   uv_loop_t loop;
   uv_async_t async;
+  uv_timer_t timer;
 
   /* Owner thread: proactor collector and batch can belong to leader or a worker */
   pn_collector_t *collector;
@@ -168,8 +169,11 @@ struct pn_proactor_t {
   queue worker_q;
   queue leader_q;
   size_t interrupt;             /* pending interrupts */
+  pn_millis_t timeout;
   size_t count;                 /* psocket count */
   bool inactive:1;
+  bool timeout_request:1;
+  bool timeout_elapsed:1;
   bool has_leader:1;
   bool batch_working:1;          /* batch belongs to a worker.  */
 };
@@ -551,6 +555,13 @@ static void on_write(uv_write_t* write, int err) {
   pc->writing = 0;              /* Need to send a new write request */
 }
 
+static void on_timeout(uv_timer_t *timer) {
+  pn_proactor_t *p = (pn_proactor_t*)timer->data;
+  uv_mutex_lock(&p->lock);
+  p->timeout_elapsed = true;
+  uv_mutex_unlock(&p->lock);
+}
+
 // Read buffer allocation function for uv, just returns the transports read buffer.
 static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
   pconnection_t *pc = (pconnection_t*)stream->data;
@@ -587,6 +598,7 @@ static void leader_rewatch(psocket_t *ps) {
   }
 }
 
+/* Set the event in the proactor's batch  */
 static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
   pn_collector_put(p->collector, pn_proactor__class(), p, t);
   p->batch_working = true;
@@ -604,6 +616,10 @@ static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
       --p->interrupt;
       return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
     }
+    if (p->timeout_elapsed) {
+      p->timeout_elapsed = false;
+      return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT);
+    }
   }
   for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
     if (ps->is_conn) {
@@ -676,6 +692,14 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
     /* Lead till there is work to do. */
     p->has_leader = true;
     while (batch == NULL) {
+      if (p->timeout_request) {
+        p->timeout_request = false;
+        if (p->timeout) {
+          uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
+        } else {
+          uv_timer_stop(&p->timer);
+        }
+      }
       for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
         void (*action)(psocket_t*) = ps->action;
         void (*wakeup)(psocket_t*) = ps->wakeup;
@@ -710,6 +734,14 @@ void pn_proactor_interrupt(pn_proactor_t *p) {
   uv_mutex_unlock(&p->lock);
 }
 
+void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
+  uv_mutex_lock(&p->lock);
+  p->timeout = t;
+  p->timeout_request = true;
+  uv_async_send(&p->async);   /* Interrupt the UV loop */
+  uv_mutex_unlock(&p->lock);
+}
+
 int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn_bytes_t extra) {
   pconnection_t *pc = new_pconnection_t(p, false, host, port, extra);
   if (!pc) {
@@ -765,7 +797,9 @@ pn_proactor_t *pn_proactor() {
   uv_loop_init(&p->loop);
   uv_mutex_init(&p->lock);
   uv_cond_init(&p->cond);
-  uv_async_init(&p->loop, &p->async, NULL); /* Just wake the loop */
+  uv_async_init(&p->loop, &p->async, NULL);
+  uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */
+  p->timer.data = p;
   return p;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2c8a3a3/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index d64ea2d..42facb0 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -44,6 +44,9 @@ typedef struct app_data_t {
   int sent;
   int acknowledged;
   pn_proactor_t *proactor;
+  pn_millis_t delay;
+  bool delaying;
+  pn_link_t *sender;
   bool finished;
 } app_data_t;
 
@@ -91,6 +94,23 @@ static pn_bytes_t encode_message(app_data_t* app) {
   return pn_bytes(mbuf.size, mbuf.start);
 }
 
+static void send(app_data_t* app) {
+  while (pn_link_credit(app->sender) > 0 && app->sent < app->message_count) {
+    ++app->sent;
+    // Use sent counter bytes as unique delivery tag.
+    pn_delivery(app->sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+    pn_bytes_t msgbuf = encode_message(app);
+    pn_link_send(app->sender, msgbuf.start, msgbuf.size);
+    pn_link_advance(app->sender);
+    if (app->delay && app->sent < app->message_count) {
+      /* If delay is set, wait for TIMEOUT event to send more */
+      app->delaying = true;
+      pn_proactor_set_timeout(app->proactor, app->delay);
+      break;
+    }
+  }
+}
+
 static void handle(app_data_t* app, pn_event_t* event) {
   switch (pn_event_type(event)) {
 
@@ -105,18 +125,24 @@ static void handle(app_data_t* app, pn_event_t* event) {
      pn_link_open(l);
    } break;
 
-   case PN_LINK_FLOW: {
-     /* The peer has given us some credit, now we can send messages */
-     pn_link_t *sender = pn_event_link(event);
-     while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
-       ++app->sent;
-       // Use sent counter bytes as unique delivery tag.
-       pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
-       pn_bytes_t msgbuf = encode_message(app);
-       pn_link_send(sender, msgbuf.start, msgbuf.size);
-       pn_link_advance(sender);
-     }
-   } break;
+   case PN_LINK_FLOW:
+    /* The peer has given us some credit, now we can send messages */
+    if (!app->delaying) {
+      app->sender = pn_event_link(event);
+      send(app);
+    }
+    break;
+
+   case PN_PROACTOR_TIMEOUT:
+    /* Wake the sender's connection */
+    pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
+    break;
+
+   case PN_CONNECTION_WAKE:
+    /* Timeout, we can send more. */
+    app->delaying = false;
+    send(app);
+    break;
 
    case PN_DELIVERY: {
      /* We received acknowledgedment from the peer that a message was delivered. */
@@ -158,7 +184,7 @@ static void handle(app_data_t* app, pn_event_t* event) {
 }
 
 static void usage(const char *arg0) {
-  fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+  fprintf(stderr, "Usage: %s [-a url] [-m message-count] [-d delay-ms]\n", arg0);
   exit(1);
 }
 
@@ -169,10 +195,11 @@ int main(int argc, char **argv) {
   const char* urlstr = NULL;
 
   int opt;
-  while((opt = getopt(argc, argv, "a:m:")) != -1) {
+  while((opt = getopt(argc, argv, "a:m:d:")) != -1) {
     switch(opt) {
      case 'a': urlstr = optarg; break;
      case 'm': app.message_count = atoi(optarg); break;
+     case 'd': app.delay = atoi(optarg); break;
      default: usage(argv[0]); break;
     }
   }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2c8a3a3/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
index 5dc3a99..a86425d 100644
--- a/examples/c/proactor/test.py
+++ b/examples/c/proactor/test.py
@@ -48,5 +48,13 @@ class CExampleTest(BrokerTestCase):
         self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
         self.assertEqual(receive_expect(100), r.wait_out())
 
+    def test_timed_send(self):
+        """Send with timed delay"""
+        s = self.proc(["libuv_send", "-a", self.addr, "-d100", "-m3"])
+        self.assertEqual("3 messages sent and acknowledged\n", s.wait_out())
+        r = self.proc(["libuv_receive", "-a", self.addr, "-m3"])
+        self.assertEqual(receive_expect(3), r.wait_out())
+
+
 if __name__ == "__main__":
     unittest.main()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org