You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apr.apache.org by yl...@apache.org on 2015/03/16 18:18:11 UTC
svn commit: r1667073 - in /apr/apr/trunk: CHANGES include/apr_queue.h
test/testqueue.c util-misc/apr_queue.c
Author: ylavic
Date: Mon Mar 16 17:18:11 2015
New Revision: 1667073
URL: http://svn.apache.org/r1667073
Log:
apr_queue: Add apr_queue_timedpush() and apr_queue_timedpop() to
support timedout operations. PR 56951.
Submitted by: Travis Cross <tc+asf travislists com>
Reviewed/Modified: ylavic
Modified:
apr/apr/trunk/CHANGES
apr/apr/trunk/include/apr_queue.h
apr/apr/trunk/test/testqueue.c
apr/apr/trunk/util-misc/apr_queue.c
Modified: apr/apr/trunk/CHANGES
URL: http://svn.apache.org/viewvc/apr/apr/trunk/CHANGES?rev=1667073&r1=1667072&r2=1667073&view=diff
==============================================================================
--- apr/apr/trunk/CHANGES [utf-8] (original)
+++ apr/apr/trunk/CHANGES [utf-8] Mon Mar 16 17:18:11 2015
@@ -1,6 +1,10 @@
-*- coding: utf-8 -*-
Changes for APR 2.0.0
+ *) apr_queue: Add apr_queue_timedpush() and apr_queue_timedpop() to
+ support timedout operations. PR 56951. [Travis Cross
+ <tc+asf travislists com>, Yann Ylavic].
+
*) apr_pollset: On z/OS, threadsafe apr_pollset_poll() may return
"EDC8102I Operation would block" under load.
[Pat Odonnell <patod us.ibm.com>]
Modified: apr/apr/trunk/include/apr_queue.h
URL: http://svn.apache.org/viewvc/apr/apr/trunk/include/apr_queue.h?rev=1667073&r1=1667072&r2=1667073&view=diff
==============================================================================
--- apr/apr/trunk/include/apr_queue.h (original)
+++ apr/apr/trunk/include/apr_queue.h Mon Mar 16 17:18:11 2015
@@ -28,6 +28,7 @@
#include "apu.h"
#include "apr_errno.h"
#include "apr_pools.h"
+#include "apr_time.h"
#if APR_HAS_THREADS
@@ -91,7 +92,7 @@ APR_DECLARE(apr_status_t) apr_queue_pop(
APR_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data);
/**
- * pop/get an object to the queue, returning immediately if the queue is empty
+ * pop/get an object from the queue, returning immediately if the queue is empty
*
* @param queue the queue
* @param data the data
@@ -103,6 +104,38 @@ APR_DECLARE(apr_status_t) apr_queue_tryp
APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data);
/**
+ * push/add an object to the queue, waiting a maximum of timeout microseconds
+ * before returning if the queue is empty
+ *
+ * @param queue the queue
+ * @param data the data
+ * @param timeout the timeout
+ * @returns APR_EINTR the blocking operation was interrupted (try again)
+ * @returns APR_EAGAIN the queue is empty and timeout is 0
+ * @returns APR_TIMEUP the queue is empty and the timeout expired
+ * @returns APR_EOF the queue has been terminated
+ * @returns APR_SUCCESS on a successful pop
+ */
+APR_DECLARE(apr_status_t) apr_queue_timedpush(apr_queue_t *queue, void *data,
+ apr_interval_time_t timeout);
+
+/**
+ * pop/get an object from the queue, waiting a maximum of timeout microseconds
+ * before returning if the queue is empty
+ *
+ * @param queue the queue
+ * @param data the data
+ * @param timeout the timeout
+ * @returns APR_EINTR the blocking operation was interrupted (try again)
+ * @returns APR_EAGAIN the queue is empty and timeout is 0
+ * @returns APR_TIMEUP the queue is empty and the timeout expired
+ * @returns APR_EOF the queue has been terminated
+ * @returns APR_SUCCESS on a successful pop
+ */
+APR_DECLARE(apr_status_t) apr_queue_timedpop(apr_queue_t *queue, void **data,
+ apr_interval_time_t timeout);
+
+/**
* returns the size of the queue.
*
* @warning this is not threadsafe, and is intended for reporting/monitoring
Modified: apr/apr/trunk/test/testqueue.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/test/testqueue.c?rev=1667073&r1=1667072&r2=1667073&view=diff
==============================================================================
--- apr/apr/trunk/test/testqueue.c (original)
+++ apr/apr/trunk/test/testqueue.c Mon Mar 16 17:18:11 2015
@@ -121,6 +121,55 @@ static void test_queue_producer_consumer
ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
}
+static void test_queue_timeout(abts_case *tc, void *data)
+{
+ apr_queue_t *q;
+ apr_status_t rv;
+ apr_time_t start;
+ unsigned int i;
+ void *value;
+
+ rv = apr_queue_create(&q, 5, p);
+ ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+
+ for (i = 0; i < 2; ++i) {
+ rv = apr_queue_timedpush(q, NULL, apr_time_from_msec(1));
+ ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+ }
+ for (i = 0; i < 3; ++i) {
+ rv = apr_queue_trypush(q, NULL);
+ ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+ }
+
+ start = apr_time_now();
+ rv = apr_queue_timedpush(q, NULL, apr_time_from_msec(1));
+ ABTS_TRUE(tc, APR_STATUS_IS_TIMEUP(rv));
+ ABTS_TRUE(tc, apr_time_now() - start >= apr_time_from_msec(1));
+
+ rv = apr_queue_trypush(q, NULL);
+ ABTS_TRUE(tc, APR_STATUS_IS_EAGAIN(rv));
+
+ for (i = 0; i < 2; ++i) {
+ rv = apr_queue_timedpop(q, &value, apr_time_from_msec(1));
+ ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+ }
+ for (i = 0; i < 3; ++i) {
+ rv = apr_queue_trypop(q, &value);
+ ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+ }
+
+ start = apr_time_now();
+ rv = apr_queue_timedpop(q, &value, apr_time_from_sec(1));
+ ABTS_TRUE(tc, APR_STATUS_IS_TIMEUP(rv));
+ ABTS_TRUE(tc, apr_time_now() - start >= apr_time_from_msec(1));
+
+ rv = apr_queue_trypop(q, &value);
+ ABTS_TRUE(tc, APR_STATUS_IS_EAGAIN(rv));
+
+ rv = apr_queue_term(q);
+ ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+}
+
#endif /* APR_HAS_THREADS */
abts_suite *testqueue(abts_suite *suite)
@@ -129,6 +178,7 @@ abts_suite *testqueue(abts_suite *suite)
#if APR_HAS_THREADS
abts_run_test(suite, test_queue_producer_consumer, NULL);
+ abts_run_test(suite, test_queue_timeout, NULL);
#endif /* APR_HAS_THREADS */
return suite;
Modified: apr/apr/trunk/util-misc/apr_queue.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/util-misc/apr_queue.c?rev=1667073&r1=1667072&r2=1667073&view=diff
==============================================================================
--- apr/apr/trunk/util-misc/apr_queue.c (original)
+++ apr/apr/trunk/util-misc/apr_queue.c Mon Mar 16 17:18:11 2015
@@ -145,7 +145,8 @@ APR_DECLARE(apr_status_t) apr_queue_crea
* the push operation has completed, it signals other threads waiting
* in apr_queue_pop() that they may continue consuming sockets.
*/
-APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
+static apr_status_t queue_push(apr_queue_t *queue, void *data,
+ apr_interval_time_t timeout)
{
apr_status_t rv;
@@ -159,9 +160,21 @@ APR_DECLARE(apr_status_t) apr_queue_push
}
if (apr_queue_full(queue)) {
+ if (!timeout) {
+ apr_thread_mutex_unlock(queue->one_big_mutex);
+ return APR_EAGAIN;
+ }
if (!queue->terminated) {
queue->full_waiters++;
- rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
+ if (timeout > 0) {
+ rv = apr_thread_cond_timedwait(queue->not_full,
+ queue->one_big_mutex,
+ timeout);
+ }
+ else {
+ rv = apr_thread_cond_wait(queue->not_full,
+ queue->one_big_mutex);
+ }
queue->full_waiters--;
if (rv != APR_SUCCESS) {
apr_thread_mutex_unlock(queue->one_big_mutex);
@@ -203,6 +216,11 @@ APR_DECLARE(apr_status_t) apr_queue_push
return rv;
}
+APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
+{
+ return queue_push(queue, data, -1);
+}
+
/**
* Push new data onto the queue. If the queue is full, return APR_EAGAIN. If
* the push operation completes successfully, it signals other threads
@@ -210,42 +228,13 @@ APR_DECLARE(apr_status_t) apr_queue_push
*/
APR_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data)
{
- apr_status_t rv;
-
- if (queue->terminated) {
- return APR_EOF; /* no more elements ever again */
- }
-
- rv = apr_thread_mutex_lock(queue->one_big_mutex);
- if (rv != APR_SUCCESS) {
- return rv;
- }
-
- if (apr_queue_full(queue)) {
- rv = apr_thread_mutex_unlock(queue->one_big_mutex);
- if (rv != APR_SUCCESS) {
- return rv;
- }
- return APR_EAGAIN;
- }
-
- queue->data[queue->in] = data;
- queue->in++;
- if (queue->in >= queue->bounds)
- queue->in -= queue->bounds;
- queue->nelts++;
-
- if (queue->empty_waiters) {
- Q_DBG("sig !empty", queue);
- rv = apr_thread_cond_signal(queue->not_empty);
- if (rv != APR_SUCCESS) {
- apr_thread_mutex_unlock(queue->one_big_mutex);
- return rv;
- }
- }
+ return queue_push(queue, data, 0);
+}
- rv = apr_thread_mutex_unlock(queue->one_big_mutex);
- return rv;
+APR_DECLARE(apr_status_t) apr_queue_timedpush(apr_queue_t *queue, void *data,
+ apr_interval_time_t timeout)
+{
+ return queue_push(queue, data, timeout);
}
/**
@@ -257,11 +246,13 @@ APR_DECLARE(unsigned int) apr_queue_size
/**
* Retrieves the next item from the queue. If there are no
- * items available, it will block until one becomes available.
- * Once retrieved, the item is placed into the address specified by
- * 'data'.
+ * items available, it will either return APR_EAGAIN (timeout = 0),
+ * or block until one becomes available (infinitely with timeout < 0,
+ * otherwise until the given timeout expires). Once retrieved, the
+ * item is placed into the address specified by 'data'.
*/
-APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
+static apr_status_t queue_pop(apr_queue_t *queue, void **data,
+ apr_interval_time_t timeout)
{
apr_status_t rv;
@@ -276,9 +267,21 @@ APR_DECLARE(apr_status_t) apr_queue_pop(
/* Keep waiting until we wake up and find that the queue is not empty. */
if (apr_queue_empty(queue)) {
+ if (!timeout) {
+ apr_thread_mutex_unlock(queue->one_big_mutex);
+ return APR_EAGAIN;
+ }
if (!queue->terminated) {
queue->empty_waiters++;
- rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
+ if (timeout > 0) {
+ rv = apr_thread_cond_timedwait(queue->not_empty,
+ queue->one_big_mutex,
+ timeout);
+ }
+ else {
+ rv = apr_thread_cond_wait(queue->not_empty,
+ queue->one_big_mutex);
+ }
queue->empty_waiters--;
if (rv != APR_SUCCESS) {
apr_thread_mutex_unlock(queue->one_big_mutex);
@@ -320,46 +323,20 @@ APR_DECLARE(apr_status_t) apr_queue_pop(
return rv;
}
-/**
- * Retrieves the next item from the queue. If there are no
- * items available, return APR_EAGAIN. Once retrieved,
- * the item is placed into the address specified by 'data'.
- */
-APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
+APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
{
- apr_status_t rv;
-
- if (queue->terminated) {
- return APR_EOF; /* no more elements ever again */
- }
-
- rv = apr_thread_mutex_lock(queue->one_big_mutex);
- if (rv != APR_SUCCESS) {
- return rv;
- }
-
- if (apr_queue_empty(queue)) {
- (void)apr_thread_mutex_unlock(queue->one_big_mutex);
- return APR_EAGAIN;
- }
-
- *data = queue->data[queue->out];
- queue->nelts--;
+ return queue_pop(queue, data, -1);
+}
- queue->out++;
- if (queue->out >= queue->bounds)
- queue->out -= queue->bounds;
- if (queue->full_waiters) {
- Q_DBG("signal !full", queue);
- rv = apr_thread_cond_signal(queue->not_full);
- if (rv != APR_SUCCESS) {
- apr_thread_mutex_unlock(queue->one_big_mutex);
- return rv;
- }
- }
+APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
+{
+ return queue_pop(queue, data, 0);
+}
- rv = apr_thread_mutex_unlock(queue->one_big_mutex);
- return rv;
+APR_DECLARE(apr_status_t) apr_queue_timedpop(apr_queue_t *queue, void **data,
+ apr_interval_time_t timeout)
+{
+ return queue_pop(queue, data, timeout);
}
APR_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue)