You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nuttx.apache.org by xi...@apache.org on 2021/07/28 04:01:48 UTC
[incubator-nuttx] 04/04: usrwqueue: implement order work queue
This is an automated email from the ASF dual-hosted git repository.
xiaoxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nuttx.git
commit 23d87ff9df116f0f43465a13adc2fbe720fc3466
Author: Jiuzhu Dong <do...@xiaomi.com>
AuthorDate: Mon Jul 26 10:36:13 2021 +0800
usrwqueue: implement order work queue
Signed-off-by: Jiuzhu Dong <do...@xiaomi.com>
---
include/nuttx/semaphore.h | 2 +-
libs/libc/wqueue/Make.defs | 2 +-
libs/libc/wqueue/work_cancel.c | 49 +++++++++---
libs/libc/wqueue/work_queue.c | 80 ++++++++++++++-----
libs/libc/wqueue/work_signal.c | 99 -----------------------
libs/libc/wqueue/work_usrthread.c | 164 +++++++++++---------------------------
libs/libc/wqueue/wqueue.h | 4 +-
7 files changed, 149 insertions(+), 251 deletions(-)
diff --git a/include/nuttx/semaphore.h b/include/nuttx/semaphore.h
index 42bd308..83fb3c1 100644
--- a/include/nuttx/semaphore.h
+++ b/include/nuttx/semaphore.h
@@ -61,7 +61,7 @@
# define _SEM_TIMEDWAIT(s,t) nxsem_timedwait(s,t)
# define _SEM_CLOCKWAIT(s,c,t) nxsem_clockwait(s,c,t)
# define _SEM_POST(s) nxsem_post(s)
-# define _SEM_GETVALUE(s) nxsem_get_value(s)
+# define _SEM_GETVALUE(s,v) nxsem_get_value(s,v)
# define _SEM_GETPROTOCOL(s,p) nxsem_get_protocol(s,p)
# define _SEM_SETPROTOCOL(s,p) nxsem_set_protocol(s,p)
# define _SEM_ERRNO(r) (-(r))
diff --git a/libs/libc/wqueue/Make.defs b/libs/libc/wqueue/Make.defs
index 72862bf..a0633d1 100644
--- a/libs/libc/wqueue/Make.defs
+++ b/libs/libc/wqueue/Make.defs
@@ -22,7 +22,7 @@ ifeq ($(CONFIG_LIB_USRWORK),y)
# Add the work queue C files to the build
-CSRCS += work_usrthread.c work_queue.c work_cancel.c work_signal.c
+CSRCS += work_usrthread.c work_queue.c work_cancel.c
# Add the wqueue directory to the build
diff --git a/libs/libc/wqueue/work_cancel.c b/libs/libc/wqueue/work_cancel.c
index 54e084c..77a4c34 100644
--- a/libs/libc/wqueue/work_cancel.c
+++ b/libs/libc/wqueue/work_cancel.c
@@ -48,8 +48,8 @@
* work_queue() again.
*
* Input Parameters:
- * qid - The work queue ID
- * work - The previously queued work structure to cancel
+ * wqueue - The work queue
+ * work - The previously queue work structure to cancel
*
* Returned Value:
* Zero (OK) on success, a negated errno on failure. This error may be
@@ -63,7 +63,10 @@
static int work_qcancel(FAR struct usr_wqueue_s *wqueue,
FAR struct work_s *work)
{
+ FAR sq_entry_t *prev = NULL;
+ FAR sq_entry_t *curr;
int ret = -ENOENT;
+ int semcount;
DEBUGASSERT(work != NULL);
@@ -78,18 +81,44 @@ static int work_qcancel(FAR struct usr_wqueue_s *wqueue,
if (work->worker != NULL)
{
- /* A little test of the integrity of the work queue */
+ /* Search the work activelist for the target work. We can't
+ * use sq_rem to do this because there are additional operations that
+ * need to be done.
+ */
- DEBUGASSERT(work->dq.flink != NULL ||
- (FAR dq_entry_t *)work == wqueue->q.tail);
- DEBUGASSERT(work->dq.blink != NULL ||
- (FAR dq_entry_t *)work == wqueue->q.head);
+ curr = wqueue->q.head;
+ while (curr && curr != &work->u.s.sq)
+ {
+ prev = curr;
+ curr = curr->flink;
+ }
- /* Remove the entry from the work queue and make sure that it is
- * marked as available (i.e., the worker field is nullified).
+ /* Check if the work was found in the list. If not, then an OS
+ * error has occurred because the work is marked active!
*/
- dq_rem((FAR dq_entry_t *)work, &wqueue->q);
+ DEBUGASSERT(curr);
+
+ /* Now, remove the work from the work queue */
+
+ if (prev)
+ {
+ /* Remove the work from mid- or end-of-queue */
+
+ sq_remafter(prev, &wqueue->q);
+ }
+ else
+ {
+ /* Remove the work at the head of the queue */
+
+ sq_remfirst(&wqueue->q);
+ _SEM_GETVALUE(&wqueue->wake, &semcount);
+ if (semcount < 1)
+ {
+ _SEM_POST(&wqueue->wake);
+ }
+ }
+
work->worker = NULL;
ret = OK;
}
diff --git a/libs/libc/wqueue/work_queue.c b/libs/libc/wqueue/work_queue.c
index 5f631f2..7470b4a 100644
--- a/libs/libc/wqueue/work_queue.c
+++ b/libs/libc/wqueue/work_queue.c
@@ -32,6 +32,7 @@
#include <nuttx/clock.h>
#include <nuttx/wqueue.h>
+#include <nuttx/semaphore.h>
#include "wqueue/wqueue.h"
@@ -56,7 +57,7 @@
* and remove it from the work queue.
*
* Input Parameters:
- * qid - The work queue ID (index)
+ * wqueue - The work queue
* work - The work structure to queue
* worker - The worker callback to be invoked. The callback will be
* invoked on the worker thread of execution.
@@ -74,35 +75,72 @@ static int work_qqueue(FAR struct usr_wqueue_s *wqueue,
FAR struct work_s *work, worker_t worker,
FAR void *arg, clock_t delay)
{
- DEBUGASSERT(work != NULL);
+ FAR sq_entry_t *prev = NULL;
+ FAR sq_entry_t *curr;
+ sclock_t delta;
+ int semcount;
/* Get exclusive access to the work queue */
while (_SEM_WAIT(&wqueue->lock) < 0);
- /* Is there already pending work? */
+ /* Initialize the work structure */
- if (work->worker != NULL)
- {
- /* Remove the entry from the work queue. It will be requeued at the
- * end of the work queue.
- */
+ work->worker = worker; /* Work callback. non-NULL means queued */
+ work->arg = arg; /* Callback argument */
+ work->u.s.qtime = clock() + delay; /* Delay until work performed */
- dq_rem((FAR dq_entry_t *)work, &wqueue->q);
- }
+ /* Do the easy case first -- when the work queue is empty. */
- /* Initialize the work structure */
-
- work->worker = worker; /* Work callback. non-NULL means queued */
- work->arg = arg; /* Callback argument */
- work->delay = delay; /* Delay until work performed */
+ if (wqueue->q.head == NULL)
+ {
+ /* Add the watchdog to the head == tail of the queue. */
- /* Now, time-tag that entry and put it in the work queue. */
+ sq_addfirst(&work->u.s.sq, &wqueue->q);
+ _SEM_POST(&wqueue->wake);
+ }
- work->qtime = clock(); /* Time work queued */
+ /* There are other active watchdogs in the timer queue */
- dq_addlast((FAR dq_entry_t *)work, &wqueue->q);
- kill(wqueue->pid, SIGWORK); /* Wake up the worker thread */
+ else
+ {
+ curr = wqueue->q.head;
+
+ /* Check if the new work must be inserted before the curr. */
+
+ do
+ {
+ delta = work->u.s.qtime - ((FAR struct work_s *)curr)->u.s.qtime;
+ if (delta < 0)
+ {
+ break;
+ }
+
+ prev = curr;
+ curr = curr->flink;
+ }
+ while (curr != NULL);
+
+ /* Insert the new watchdog in the list */
+
+ if (prev == NULL)
+ {
+ /* Insert the watchdog at the head of the list */
+
+ sq_addfirst(&work->u.s.sq, &wqueue->q);
+ _SEM_GETVALUE(&wqueue->wake, &semcount);
+ if (semcount < 1)
+ {
+ _SEM_POST(&wqueue->wake);
+ }
+ }
+ else
+ {
+ /* Insert the watchdog in mid- or end-of-queue */
+
+ sq_addafter(prev, &work->u.s.sq, &wqueue->q);
+ }
+ }
_SEM_POST(&wqueue->lock);
return OK;
@@ -146,6 +184,10 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
{
if (qid == USRWORK)
{
+ /* Is there already pending work? */
+
+ work_cancel(qid, work);
+
return work_qqueue(&g_usrwork, work, worker, arg, delay);
}
else
diff --git a/libs/libc/wqueue/work_signal.c b/libs/libc/wqueue/work_signal.c
deleted file mode 100644
index eae548b..0000000
--- a/libs/libc/wqueue/work_signal.c
+++ /dev/null
@@ -1,99 +0,0 @@
-/****************************************************************************
- * libs/libc/wqueue/work_signal.c
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The
- * ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- *
- ****************************************************************************/
-
-/****************************************************************************
- * Included Files
- ****************************************************************************/
-
-#include <nuttx/config.h>
-
-#include <signal.h>
-#include <errno.h>
-
-#include <nuttx/wqueue.h>
-
-#include "wqueue/wqueue.h"
-
-#if defined(CONFIG_LIB_USRWORK) && !defined(__KERNEL__)
-
-/****************************************************************************
- * Pre-processor Definitions
- ****************************************************************************/
-
-/****************************************************************************
- * Private Type Declarations
- ****************************************************************************/
-
-/****************************************************************************
- * Public Data
- ****************************************************************************/
-
-/****************************************************************************
- * Private Data
- ****************************************************************************/
-
-/****************************************************************************
- * Private Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Public Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Name: work_signal
- *
- * Description:
- * Signal the worker thread to process the work queue now. This function
- * is used internally by the work logic but could also be used by the
- * user to force an immediate re-assessment of pending work.
- *
- * Input Parameters:
- * qid - The work queue ID
- *
- * Returned Value:
- * Zero on success, a negated errno on failure
- *
- ****************************************************************************/
-
-int work_signal(int qid)
-{
- int ret;
-
- if (qid == USRWORK)
- {
- /* Signal the worker thread */
-
- ret = kill(g_usrwork.pid, SIGWORK);
- if (ret < 0)
- {
- int errcode = get_errno();
- ret = -errcode;
- }
- }
- else
- {
- ret = -EINVAL;
- }
-
- return ret;
-}
-
-#endif /* CONFIG_LIB_USRWORK && !__KERNEL__ */
diff --git a/libs/libc/wqueue/work_usrthread.c b/libs/libc/wqueue/work_usrthread.c
index 0c71b8a..8c2b345 100644
--- a/libs/libc/wqueue/work_usrthread.c
+++ b/libs/libc/wqueue/work_usrthread.c
@@ -27,7 +27,6 @@
#include <stdint.h>
#include <unistd.h>
#include <pthread.h>
-#include <signal.h>
#include <sched.h>
#include <errno.h>
#include <assert.h>
@@ -45,26 +44,12 @@
* Pre-processor Definitions
****************************************************************************/
-/* Use CLOCK_MONOTONIC if it is available. CLOCK_REALTIME can cause bad
- * delays if the time is changed.
- */
-
-#ifdef CONFIG_CLOCK_MONOTONIC
-# define WORK_CLOCK CLOCK_MONOTONIC
-#else
-# define WORK_CLOCK CLOCK_REALTIME
-#endif
-
#ifdef CONFIG_SYSTEM_TIME64
# define WORK_DELAY_MAX UINT64_MAX
#else
# define WORK_DELAY_MAX UINT32_MAX
#endif
-#ifndef MIN
-# define MIN(a,b) ((a) < (b) ? (a) : (b))
-#endif
-
/****************************************************************************
* Private Type Declarations
****************************************************************************/
@@ -98,17 +83,12 @@ struct usr_wqueue_s g_usrwork;
*
****************************************************************************/
-void work_process(FAR struct usr_wqueue_s *wqueue)
+static void work_process(FAR struct usr_wqueue_s *wqueue)
{
volatile FAR struct work_s *work;
- sigset_t sigset;
- sigset_t oldset;
- worker_t worker;
+ worker_t worker;
FAR void *arg;
- clock_t elapsed;
- clock_t remaining;
- clock_t stick;
- clock_t ctick;
+ sclock_t elapsed;
clock_t next;
int ret;
@@ -125,15 +105,6 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
return;
}
- /* Set up the signal mask */
-
- sigemptyset(&sigset);
- sigaddset(&sigset, SIGWORK);
-
- /* Get the time that we started this polling cycle in clock ticks. */
-
- stick = clock();
-
/* And check each entry in the work queue. Since we have locked the
* work queue we know: (1) we will not be suspended unless we do
* so ourselves, and (2) there will be no changes to the work queue
@@ -142,19 +113,21 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
work = (FAR struct work_s *)wqueue->q.head;
while (work)
{
- /* Is this work ready? It is ready if there is no delay or if
- * the delay has elapsed. qtime is the time that the work was added
- * to the work queue. It will always be greater than or equal to
- * zero. Therefore a delay of zero will always execute immediately.
+ /* Is this work ready? It is ready if there is no delay or if
+ * the delay has elapsed. is the time that the work was added
+ * to the work queue. Therefore a delay of equal or less than
+ * zero will always execute immediately.
*/
- ctick = clock();
- elapsed = ctick - work->qtime;
- if (elapsed >= work->delay)
+ elapsed = clock() - work->u.s.qtime;
+
+ /* Is this delay work ready? */
+
+ if (elapsed >= 0)
{
/* Remove the ready-to-execute work from the list */
- dq_rem((struct dq_entry_s *)work, &wqueue->q);
+ sq_remfirst(&wqueue->q);
/* Extract the work description from the entry (in case the work
* instance by the re-used after it has been de-queued).
@@ -195,65 +168,26 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
return;
}
-
- work = (FAR struct work_s *)wqueue->q.head;
}
- else
- {
- /* Canceled.. Just move to the next work in the list with
- * the work queue still locked.
- */
- work = (FAR struct work_s *)work->dq.flink;
- }
+ work = (FAR struct work_s *)wqueue->q.head;
}
- else /* elapsed < work->delay */
+ else
{
- /* This one is not ready.
- *
- * NOTE that elapsed is relative to the current time,
- * not the time of beginning of this queue processing pass.
- * So it may need an adjustment.
- */
-
- elapsed += (ctick - stick);
- if (elapsed > work->delay)
- {
- /* The delay has expired while we are processing */
-
- elapsed = work->delay;
- }
-
- /* Will it be ready before the next scheduled wakeup interval? */
-
- remaining = work->delay - elapsed;
- if (remaining < next)
- {
- /* Yes.. Then schedule to wake up when the work is ready */
-
- next = remaining;
- }
-
- /* Then try the next in the list. */
-
- work = (FAR struct work_s *)work->dq.flink;
+ next = work->u.s.qtime - clock();
+ break;
}
}
- /* Unlock the work queue before waiting. In order to assure that we do
- * not lose the SIGWORK signal before waiting, we block the SIGWORK
- * signals before unlocking the work queue. That will cause in SIGWORK
- * signals directed to the worker thread to pend.
- */
+ /* Unlock the work queue before waiting. */
- sigprocmask(SIG_BLOCK, &sigset, &oldset);
_SEM_POST(&wqueue->lock);
if (next == WORK_DELAY_MAX)
{
- /* Wait indefinitely until signaled with SIGWORK */
+ /* Wait indefinitely until work_queue has new items */
- sigwaitinfo(&sigset, NULL);
+ _SEM_WAIT(&wqueue->wake);
}
else
{
@@ -261,7 +195,7 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
time_t sec;
/* Wait awhile to check the work list. We will wait here until
- * either the time elapses or until we are awakened by a signal.
+ * either the time elapses or until we are awakened by a semaphore.
* Interrupts will be re-enabled while we wait.
*/
@@ -269,10 +203,8 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
rqtp.tv_sec = sec;
rqtp.tv_nsec = (next - (sec * 1000000)) * 1000;
- sigtimedwait(&sigset, NULL, &rqtp);
+ _SEM_TIMEDWAIT(&wqueue->wake, &rqtp);
}
-
- sigprocmask(SIG_SETMASK, &oldset, NULL);
}
/****************************************************************************
@@ -339,53 +271,48 @@ static pthread_addr_t work_usrthread(pthread_addr_t arg)
int work_usrstart(void)
{
+ int ret;
+#ifndef CONFIG_BUILD_PROTECTED
+ pthread_t usrwork;
+ pthread_attr_t attr;
+ struct sched_param param;
+#endif
+
/* Set up the work queue lock */
_SEM_INIT(&g_usrwork.lock, 0, 1);
+ _SEM_INIT(&g_usrwork.wake, 0, 0);
+ _SEM_SETPROTOCOL(&g_usrwork.wake, SEM_PRIO_NONE);
+
+ /* Initialize the work queue */
+
+ sq_init(&g_usrwork.q);
+
#ifdef CONFIG_BUILD_PROTECTED
/* Start a user-mode worker thread for use by applications. */
- g_usrwork.pid = task_create("uwork",
- CONFIG_LIB_USRWORKPRIORITY,
- CONFIG_LIB_USRWORKSTACKSIZE,
- (main_t)work_usrthread,
- (FAR char * const *)NULL);
-
- DEBUGASSERT(g_usrwork.pid > 0);
- if (g_usrwork.pid < 0)
+ ret = task_create("uwork",
+ CONFIG_LIB_USRWORKPRIORITY,
+ CONFIG_LIB_USRWORKSTACKSIZE,
+ (main_t)work_usrthread,
+ ((FAR char * const *)NULL));
+ if (ret < 0)
{
int errcode = get_errno();
DEBUGASSERT(errcode > 0);
return -errcode;
}
- return g_usrwork.pid;
+ return ret;
#else
- pthread_t usrwork;
- pthread_attr_t attr;
- struct sched_param param;
- int ret;
-
/* Start a user-mode worker thread for use by applications. */
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, CONFIG_LIB_USRWORKSTACKSIZE);
-#ifdef CONFIG_SCHED_SPORADIC
- /* Get the current sporadic scheduling parameters. Those will not be
- * modified.
- */
-
- ret = set_getparam(pid, ¶m);
- if (ret < 0)
- {
- int errcode = get_errno();
- return -errcode;
- }
-#endif
-
+ pthread_attr_getschedparam(&attr, ¶m);
param.sched_priority = CONFIG_LIB_USRWORKPRIORITY;
pthread_attr_setschedparam(&attr, ¶m);
@@ -401,8 +328,7 @@ int work_usrstart(void)
pthread_detach(usrwork);
- g_usrwork.pid = (pid_t)usrwork;
- return g_usrwork.pid;
+ return (pid_t)usrwork;
#endif
}
diff --git a/libs/libc/wqueue/wqueue.h b/libs/libc/wqueue/wqueue.h
index e84c229..4c03421 100644
--- a/libs/libc/wqueue/wqueue.h
+++ b/libs/libc/wqueue/wqueue.h
@@ -46,9 +46,9 @@
struct usr_wqueue_s
{
- struct dq_queue_s q; /* The queue of pending work */
+ struct sq_queue_s q; /* The queue of pending work */
sem_t lock; /* exclusive access to user-mode work queue */
- pid_t pid; /* The task ID of the worker thread(s) */
+ sem_t wake; /* The wake-up semaphore of the usrthread */
};
/****************************************************************************