You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apr.apache.org by yl...@apache.org on 2015/03/16 18:18:11 UTC

svn commit: r1667073 - in /apr/apr/trunk: CHANGES include/apr_queue.h test/testqueue.c util-misc/apr_queue.c

Author: ylavic
Date: Mon Mar 16 17:18:11 2015
New Revision: 1667073

URL: http://svn.apache.org/r1667073
Log:
apr_queue: Add apr_queue_timedpush() and apr_queue_timedpop() to
support timedout operations. PR 56951.

Submitted by: Travis Cross <tc+asf travislists com>
Reviewed/Modified: ylavic

Modified:
    apr/apr/trunk/CHANGES
    apr/apr/trunk/include/apr_queue.h
    apr/apr/trunk/test/testqueue.c
    apr/apr/trunk/util-misc/apr_queue.c

Modified: apr/apr/trunk/CHANGES
URL: http://svn.apache.org/viewvc/apr/apr/trunk/CHANGES?rev=1667073&r1=1667072&r2=1667073&view=diff
==============================================================================
--- apr/apr/trunk/CHANGES [utf-8] (original)
+++ apr/apr/trunk/CHANGES [utf-8] Mon Mar 16 17:18:11 2015
@@ -1,6 +1,10 @@
                                                      -*- coding: utf-8 -*-
 Changes for APR 2.0.0
 
+  *) apr_queue: Add apr_queue_timedpush() and apr_queue_timedpop() to
+     support timedout operations. PR 56951.  [Travis Cross 
+     <tc+asf travislists com>, Yann Ylavic].
+
   *) apr_pollset: On z/OS, threadsafe apr_pollset_poll() may return
      "EDC8102I Operation would block" under load.
      [Pat Odonnell <patod us.ibm.com>]

Modified: apr/apr/trunk/include/apr_queue.h
URL: http://svn.apache.org/viewvc/apr/apr/trunk/include/apr_queue.h?rev=1667073&r1=1667072&r2=1667073&view=diff
==============================================================================
--- apr/apr/trunk/include/apr_queue.h (original)
+++ apr/apr/trunk/include/apr_queue.h Mon Mar 16 17:18:11 2015
@@ -28,6 +28,7 @@
 #include "apu.h"
 #include "apr_errno.h"
 #include "apr_pools.h"
+#include "apr_time.h"
 
 #if APR_HAS_THREADS
 
@@ -91,7 +92,7 @@ APR_DECLARE(apr_status_t) apr_queue_pop(
 APR_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data);
 
 /**
- * pop/get an object to the queue, returning immediately if the queue is empty
+ * pop/get an object from the queue, returning immediately if the queue is empty
  *
  * @param queue the queue
  * @param data the data
@@ -103,6 +104,38 @@ APR_DECLARE(apr_status_t) apr_queue_tryp
 APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data);
 
 /**
+ * push/add an object to the queue, waiting a maximum of timeout microseconds
+ * before returning if the queue is empty
+ *
+ * @param queue the queue
+ * @param data the data
+ * @param timeout the timeout
+ * @returns APR_EINTR the blocking operation was interrupted (try again)
+ * @returns APR_EAGAIN the queue is empty and timeout is 0
+ * @returns APR_TIMEUP the queue is empty and the timeout expired
+ * @returns APR_EOF the queue has been terminated
+ * @returns APR_SUCCESS on a successful pop
+ */
+APR_DECLARE(apr_status_t) apr_queue_timedpush(apr_queue_t *queue, void *data,
+                                              apr_interval_time_t timeout);
+
+/**
+ * pop/get an object from the queue, waiting a maximum of timeout microseconds
+ * before returning if the queue is empty
+ *
+ * @param queue the queue
+ * @param data the data
+ * @param timeout the timeout
+ * @returns APR_EINTR the blocking operation was interrupted (try again)
+ * @returns APR_EAGAIN the queue is empty and timeout is 0
+ * @returns APR_TIMEUP the queue is empty and the timeout expired
+ * @returns APR_EOF the queue has been terminated
+ * @returns APR_SUCCESS on a successful pop
+ */
+APR_DECLARE(apr_status_t) apr_queue_timedpop(apr_queue_t *queue, void **data,
+                                             apr_interval_time_t timeout);
+
+/**
  * returns the size of the queue.
  *
  * @warning this is not threadsafe, and is intended for reporting/monitoring

Modified: apr/apr/trunk/test/testqueue.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/test/testqueue.c?rev=1667073&r1=1667072&r2=1667073&view=diff
==============================================================================
--- apr/apr/trunk/test/testqueue.c (original)
+++ apr/apr/trunk/test/testqueue.c Mon Mar 16 17:18:11 2015
@@ -121,6 +121,55 @@ static void test_queue_producer_consumer
     ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
 }
 
+static void test_queue_timeout(abts_case *tc, void *data)
+{
+    apr_queue_t *q;
+    apr_status_t rv;
+    apr_time_t start;
+    unsigned int i;
+    void *value;
+
+    rv = apr_queue_create(&q, 5, p);
+    ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+
+    for (i = 0; i < 2; ++i) {
+        rv = apr_queue_timedpush(q, NULL, apr_time_from_msec(1));
+        ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+    }
+    for (i = 0; i < 3; ++i) {
+        rv = apr_queue_trypush(q, NULL);
+        ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+    }
+
+    start = apr_time_now();
+    rv = apr_queue_timedpush(q, NULL, apr_time_from_msec(1));
+    ABTS_TRUE(tc, APR_STATUS_IS_TIMEUP(rv));
+    ABTS_TRUE(tc, apr_time_now() - start >= apr_time_from_msec(1));
+
+    rv = apr_queue_trypush(q, NULL);
+    ABTS_TRUE(tc, APR_STATUS_IS_EAGAIN(rv));
+
+    for (i = 0; i < 2; ++i) {
+        rv = apr_queue_timedpop(q, &value, apr_time_from_msec(1));
+        ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+    }
+    for (i = 0; i < 3; ++i) {
+        rv = apr_queue_trypop(q, &value);
+        ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+    }
+
+    start = apr_time_now();
+    rv = apr_queue_timedpop(q, &value, apr_time_from_sec(1));
+    ABTS_TRUE(tc, APR_STATUS_IS_TIMEUP(rv));
+    ABTS_TRUE(tc, apr_time_now() - start >= apr_time_from_msec(1));
+
+    rv = apr_queue_trypop(q, &value);
+    ABTS_TRUE(tc, APR_STATUS_IS_EAGAIN(rv));
+
+    rv = apr_queue_term(q);
+    ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+}
+
 #endif /* APR_HAS_THREADS */
 
 abts_suite *testqueue(abts_suite *suite)
@@ -129,6 +178,7 @@ abts_suite *testqueue(abts_suite *suite)
 
 #if APR_HAS_THREADS
     abts_run_test(suite, test_queue_producer_consumer, NULL);
+    abts_run_test(suite, test_queue_timeout, NULL);
 #endif /* APR_HAS_THREADS */
 
     return suite;

Modified: apr/apr/trunk/util-misc/apr_queue.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/util-misc/apr_queue.c?rev=1667073&r1=1667072&r2=1667073&view=diff
==============================================================================
--- apr/apr/trunk/util-misc/apr_queue.c (original)
+++ apr/apr/trunk/util-misc/apr_queue.c Mon Mar 16 17:18:11 2015
@@ -145,7 +145,8 @@ APR_DECLARE(apr_status_t) apr_queue_crea
  * the push operation has completed, it signals other threads waiting
  * in apr_queue_pop() that they may continue consuming sockets.
  */
-APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
+static apr_status_t queue_push(apr_queue_t *queue, void *data,
+                               apr_interval_time_t timeout)
 {
     apr_status_t rv;
 
@@ -159,9 +160,21 @@ APR_DECLARE(apr_status_t) apr_queue_push
     }
 
     if (apr_queue_full(queue)) {
+        if (!timeout) {
+            apr_thread_mutex_unlock(queue->one_big_mutex);
+            return APR_EAGAIN;
+        }
         if (!queue->terminated) {
             queue->full_waiters++;
-            rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
+            if (timeout > 0) {
+                rv = apr_thread_cond_timedwait(queue->not_full,
+                                               queue->one_big_mutex,
+                                               timeout);
+            }
+            else {
+                rv = apr_thread_cond_wait(queue->not_full,
+                                          queue->one_big_mutex);
+            }
             queue->full_waiters--;
             if (rv != APR_SUCCESS) {
                 apr_thread_mutex_unlock(queue->one_big_mutex);
@@ -203,6 +216,11 @@ APR_DECLARE(apr_status_t) apr_queue_push
     return rv;
 }
 
+APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
+{
+    return queue_push(queue, data, -1);
+}
+
 /**
  * Push new data onto the queue. If the queue is full, return APR_EAGAIN. If
  * the push operation completes successfully, it signals other threads
@@ -210,42 +228,13 @@ APR_DECLARE(apr_status_t) apr_queue_push
  */
 APR_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data)
 {
-    apr_status_t rv;
-
-    if (queue->terminated) {
-        return APR_EOF; /* no more elements ever again */
-    }
-
-    rv = apr_thread_mutex_lock(queue->one_big_mutex);
-    if (rv != APR_SUCCESS) {
-        return rv;
-    }
-
-    if (apr_queue_full(queue)) {
-        rv = apr_thread_mutex_unlock(queue->one_big_mutex);
-        if (rv != APR_SUCCESS) {
-            return rv;
-        }
-        return APR_EAGAIN;
-    }
-    
-    queue->data[queue->in] = data;
-    queue->in++;
-    if (queue->in >= queue->bounds)
-        queue->in -= queue->bounds;
-    queue->nelts++;
-
-    if (queue->empty_waiters) {
-        Q_DBG("sig !empty", queue);
-        rv  = apr_thread_cond_signal(queue->not_empty);
-        if (rv != APR_SUCCESS) {
-            apr_thread_mutex_unlock(queue->one_big_mutex);
-            return rv;
-        }
-    }
+    return queue_push(queue, data, 0);
+}
 
-    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
-    return rv;
+APR_DECLARE(apr_status_t) apr_queue_timedpush(apr_queue_t *queue, void *data,
+                                              apr_interval_time_t timeout)
+{
+    return queue_push(queue, data, timeout);
 }
 
 /**
@@ -257,11 +246,13 @@ APR_DECLARE(unsigned int) apr_queue_size
 
 /**
  * Retrieves the next item from the queue. If there are no
- * items available, it will block until one becomes available.
- * Once retrieved, the item is placed into the address specified by
- * 'data'.
+ * items available, it will either return APR_EAGAIN (timeout = 0),
+ * or block until one becomes available (infinitely with timeout < 0,
+ * otherwise until the given timeout expires). Once retrieved, the
+ * item is placed into the address specified by 'data'.
  */
-APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
+static apr_status_t queue_pop(apr_queue_t *queue, void **data,
+                              apr_interval_time_t timeout)
 {
     apr_status_t rv;
 
@@ -276,9 +267,21 @@ APR_DECLARE(apr_status_t) apr_queue_pop(
 
     /* Keep waiting until we wake up and find that the queue is not empty. */
     if (apr_queue_empty(queue)) {
+        if (!timeout) {
+            apr_thread_mutex_unlock(queue->one_big_mutex);
+            return APR_EAGAIN;
+        }
         if (!queue->terminated) {
             queue->empty_waiters++;
-            rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
+            if (timeout > 0) {
+                rv = apr_thread_cond_timedwait(queue->not_empty,
+                                               queue->one_big_mutex,
+                                               timeout);
+            }
+            else {
+                rv = apr_thread_cond_wait(queue->not_empty,
+                                          queue->one_big_mutex);
+            }
             queue->empty_waiters--;
             if (rv != APR_SUCCESS) {
                 apr_thread_mutex_unlock(queue->one_big_mutex);
@@ -320,46 +323,20 @@ APR_DECLARE(apr_status_t) apr_queue_pop(
     return rv;
 }
 
-/**
- * Retrieves the next item from the queue. If there are no
- * items available, return APR_EAGAIN.  Once retrieved,
- * the item is placed into the address specified by 'data'.
- */
-APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
+APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
 {
-    apr_status_t rv;
-
-    if (queue->terminated) {
-        return APR_EOF; /* no more elements ever again */
-    }
-
-    rv = apr_thread_mutex_lock(queue->one_big_mutex);
-    if (rv != APR_SUCCESS) {
-        return rv;
-    }
-
-    if (apr_queue_empty(queue)) {
-        (void)apr_thread_mutex_unlock(queue->one_big_mutex);
-        return APR_EAGAIN;
-    } 
-
-    *data = queue->data[queue->out];
-    queue->nelts--;
+    return queue_pop(queue, data, -1);
+}
 
-    queue->out++;
-    if (queue->out >= queue->bounds)
-        queue->out -= queue->bounds;
-    if (queue->full_waiters) {
-        Q_DBG("signal !full", queue);
-        rv = apr_thread_cond_signal(queue->not_full);
-        if (rv != APR_SUCCESS) {
-            apr_thread_mutex_unlock(queue->one_big_mutex);
-            return rv;
-        }
-    }
+APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
+{
+    return queue_pop(queue, data, 0);
+}
 
-    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
-    return rv;
+APR_DECLARE(apr_status_t) apr_queue_timedpop(apr_queue_t *queue, void **data,
+                                             apr_interval_time_t timeout)
+{
+    return queue_pop(queue, data, timeout);
 }
 
 APR_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue)