You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nuttx.apache.org by xi...@apache.org on 2021/07/28 04:01:44 UTC

[incubator-nuttx] branch master updated (bb63afd -> 23d87ff)

This is an automated email from the ASF dual-hosted git repository.

xiaoxiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nuttx.git.


    from bb63afd  Documentation/bl602: Update some imformation; Add partition.toml in tool/bl602
     new a0c3a09  sched/wqueue: merge kwork_lpthread.c and kwork_hpthread.c to kwork_thread.c
     new 855c78b  work_queue: schedule the work queue using the timer mechanism
     new 00854f0  userspace/wqueue: move exclusive access lock to mqueue inside
     new 23d87ff  usrwqueue: implement order work queue

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 include/nuttx/semaphore.h         |   2 +-
 include/nuttx/wqueue.h            |  35 ++---
 libs/libc/wqueue/Make.defs        |   3 +-
 libs/libc/wqueue/work_cancel.c    |  53 +++++--
 libs/libc/wqueue/work_lock.c      | 105 --------------
 libs/libc/wqueue/work_queue.c     |  84 ++++++++---
 libs/libc/wqueue/work_signal.c    |  99 -------------
 libs/libc/wqueue/work_usrthread.c | 189 +++++++-----------------
 libs/libc/wqueue/wqueue.h         |  50 +------
 sched/wqueue/Make.defs            |  13 +-
 sched/wqueue/kwork_cancel.c       |  17 ++-
 sched/wqueue/kwork_hpthread.c     | 171 ----------------------
 sched/wqueue/kwork_lpthread.c     | 171 ----------------------
 sched/wqueue/kwork_notifier.c     |   1 -
 sched/wqueue/kwork_process.c      | 271 ----------------------------------
 sched/wqueue/kwork_queue.c        | 130 ++++++++---------
 sched/wqueue/kwork_signal.c       | 111 --------------
 sched/wqueue/kwork_thread.c       | 297 ++++++++++++++++++++++++++++++++++++++
 sched/wqueue/wqueue.h             |  34 ++---
 19 files changed, 549 insertions(+), 1287 deletions(-)
 delete mode 100644 libs/libc/wqueue/work_lock.c
 delete mode 100644 libs/libc/wqueue/work_signal.c
 delete mode 100644 sched/wqueue/kwork_hpthread.c
 delete mode 100644 sched/wqueue/kwork_lpthread.c
 delete mode 100644 sched/wqueue/kwork_process.c
 delete mode 100644 sched/wqueue/kwork_signal.c
 create mode 100644 sched/wqueue/kwork_thread.c

[incubator-nuttx] 04/04: usrwqueue: implement order work queue

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiaoxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nuttx.git

commit 23d87ff9df116f0f43465a13adc2fbe720fc3466
Author: Jiuzhu Dong <do...@xiaomi.com>
AuthorDate: Mon Jul 26 10:36:13 2021 +0800

    usrwqueue: implement order work queue
    
    Signed-off-by: Jiuzhu Dong <do...@xiaomi.com>
---
 include/nuttx/semaphore.h         |   2 +-
 libs/libc/wqueue/Make.defs        |   2 +-
 libs/libc/wqueue/work_cancel.c    |  49 +++++++++---
 libs/libc/wqueue/work_queue.c     |  80 ++++++++++++++-----
 libs/libc/wqueue/work_signal.c    |  99 -----------------------
 libs/libc/wqueue/work_usrthread.c | 164 +++++++++++---------------------------
 libs/libc/wqueue/wqueue.h         |   4 +-
 7 files changed, 149 insertions(+), 251 deletions(-)

diff --git a/include/nuttx/semaphore.h b/include/nuttx/semaphore.h
index 42bd308..83fb3c1 100644
--- a/include/nuttx/semaphore.h
+++ b/include/nuttx/semaphore.h
@@ -61,7 +61,7 @@
 #  define _SEM_TIMEDWAIT(s,t)   nxsem_timedwait(s,t)
 #  define _SEM_CLOCKWAIT(s,c,t) nxsem_clockwait(s,c,t)
 #  define _SEM_POST(s)          nxsem_post(s)
-#  define _SEM_GETVALUE(s)      nxsem_get_value(s)
+#  define _SEM_GETVALUE(s,v)    nxsem_get_value(s,v)
 #  define _SEM_GETPROTOCOL(s,p) nxsem_get_protocol(s,p)
 #  define _SEM_SETPROTOCOL(s,p) nxsem_set_protocol(s,p)
 #  define _SEM_ERRNO(r)         (-(r))
diff --git a/libs/libc/wqueue/Make.defs b/libs/libc/wqueue/Make.defs
index 72862bf..a0633d1 100644
--- a/libs/libc/wqueue/Make.defs
+++ b/libs/libc/wqueue/Make.defs
@@ -22,7 +22,7 @@ ifeq ($(CONFIG_LIB_USRWORK),y)
 
 # Add the work queue C files to the build
 
-CSRCS += work_usrthread.c work_queue.c work_cancel.c work_signal.c
+CSRCS += work_usrthread.c work_queue.c work_cancel.c
 
 # Add the wqueue directory to the build
 
diff --git a/libs/libc/wqueue/work_cancel.c b/libs/libc/wqueue/work_cancel.c
index 54e084c..77a4c34 100644
--- a/libs/libc/wqueue/work_cancel.c
+++ b/libs/libc/wqueue/work_cancel.c
@@ -48,8 +48,8 @@
  *   work_queue() again.
  *
  * Input Parameters:
- *   qid    - The work queue ID
- *   work   - The previously queued work structure to cancel
+ *   wqueue - The work queue
+ *   work   - The previously queue work structure to cancel
  *
  * Returned Value:
  *   Zero (OK) on success, a negated errno on failure.  This error may be
@@ -63,7 +63,10 @@
 static int work_qcancel(FAR struct usr_wqueue_s *wqueue,
                         FAR struct work_s *work)
 {
+  FAR sq_entry_t *prev = NULL;
+  FAR sq_entry_t *curr;
   int ret = -ENOENT;
+  int semcount;
 
   DEBUGASSERT(work != NULL);
 
@@ -78,18 +81,44 @@ static int work_qcancel(FAR struct usr_wqueue_s *wqueue,
 
   if (work->worker != NULL)
     {
-      /* A little test of the integrity of the work queue */
+      /* Search the work activelist for the target work. We can't
+       * use sq_rem to do this because there are additional operations that
+       * need to be done.
+       */
 
-      DEBUGASSERT(work->dq.flink != NULL ||
-                  (FAR dq_entry_t *)work == wqueue->q.tail);
-      DEBUGASSERT(work->dq.blink != NULL ||
-                  (FAR dq_entry_t *)work == wqueue->q.head);
+      curr = wqueue->q.head;
+      while (curr && curr != &work->u.s.sq)
+        {
+          prev = curr;
+          curr = curr->flink;
+        }
 
-      /* Remove the entry from the work queue and make sure that it is
-       * marked as available (i.e., the worker field is nullified).
+      /* Check if the work was found in the list.  If not, then an OS
+       * error has occurred because the work is marked active!
        */
 
-      dq_rem((FAR dq_entry_t *)work, &wqueue->q);
+      DEBUGASSERT(curr);
+
+      /* Now, remove the work from the work queue */
+
+      if (prev)
+        {
+          /* Remove the work from mid- or end-of-queue */
+
+          sq_remafter(prev, &wqueue->q);
+        }
+      else
+        {
+          /* Remove the work at the head of the queue */
+
+          sq_remfirst(&wqueue->q);
+          _SEM_GETVALUE(&wqueue->wake, &semcount);
+          if (semcount < 1)
+            {
+              _SEM_POST(&wqueue->wake);
+            }
+        }
+
       work->worker = NULL;
       ret = OK;
     }
diff --git a/libs/libc/wqueue/work_queue.c b/libs/libc/wqueue/work_queue.c
index 5f631f2..7470b4a 100644
--- a/libs/libc/wqueue/work_queue.c
+++ b/libs/libc/wqueue/work_queue.c
@@ -32,6 +32,7 @@
 
 #include <nuttx/clock.h>
 #include <nuttx/wqueue.h>
+#include <nuttx/semaphore.h>
 
 #include "wqueue/wqueue.h"
 
@@ -56,7 +57,7 @@
  *   and remove it from the work queue.
  *
  * Input Parameters:
- *   qid    - The work queue ID (index)
+ *   wqueue - The work queue
  *   work   - The work structure to queue
  *   worker - The worker callback to be invoked.  The callback will be
  *            invoked on the worker thread of execution.
@@ -74,35 +75,72 @@ static int work_qqueue(FAR struct usr_wqueue_s *wqueue,
                        FAR struct work_s *work, worker_t worker,
                        FAR void *arg, clock_t delay)
 {
-  DEBUGASSERT(work != NULL);
+  FAR sq_entry_t *prev = NULL;
+  FAR sq_entry_t *curr;
+  sclock_t delta;
+  int semcount;
 
   /* Get exclusive access to the work queue */
 
   while (_SEM_WAIT(&wqueue->lock) < 0);
 
-  /* Is there already pending work? */
+  /* Initialize the work structure */
 
-  if (work->worker != NULL)
-    {
-      /* Remove the entry from the work queue.  It will be requeued at the
-       * end of the work queue.
-       */
+  work->worker = worker;             /* Work callback. non-NULL means queued */
+  work->arg    = arg;                /* Callback argument */
+  work->u.s.qtime = clock() + delay; /* Delay until work performed */
 
-      dq_rem((FAR dq_entry_t *)work, &wqueue->q);
-    }
+  /* Do the easy case first -- when the work queue is empty. */
 
-  /* Initialize the work structure */
-
-  work->worker = worker;           /* Work callback. non-NULL means queued */
-  work->arg    = arg;              /* Callback argument */
-  work->delay  = delay;            /* Delay until work performed */
+  if (wqueue->q.head == NULL)
+    {
+      /* Add the watchdog to the head == tail of the queue. */
 
-  /* Now, time-tag that entry and put it in the work queue. */
+      sq_addfirst(&work->u.s.sq, &wqueue->q);
+      _SEM_POST(&wqueue->wake);
+    }
 
-  work->qtime  = clock(); /* Time work queued */
+  /* There are other active watchdogs in the timer queue */
 
-  dq_addlast((FAR dq_entry_t *)work, &wqueue->q);
-  kill(wqueue->pid, SIGWORK);   /* Wake up the worker thread */
+  else
+    {
+      curr = wqueue->q.head;
+
+      /* Check if the new work must be inserted before the curr. */
+
+      do
+        {
+          delta = work->u.s.qtime - ((FAR struct work_s *)curr)->u.s.qtime;
+          if (delta < 0)
+            {
+              break;
+            }
+
+          prev = curr;
+          curr = curr->flink;
+        }
+      while (curr != NULL);
+
+      /* Insert the new watchdog in the list */
+
+      if (prev == NULL)
+        {
+          /* Insert the watchdog at the head of the list */
+
+          sq_addfirst(&work->u.s.sq, &wqueue->q);
+          _SEM_GETVALUE(&wqueue->wake, &semcount);
+          if (semcount < 1)
+            {
+              _SEM_POST(&wqueue->wake);
+            }
+        }
+      else
+        {
+          /* Insert the watchdog in mid- or end-of-queue */
+
+          sq_addafter(prev, &work->u.s.sq, &wqueue->q);
+        }
+    }
 
   _SEM_POST(&wqueue->lock);
   return OK;
@@ -146,6 +184,10 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
 {
   if (qid == USRWORK)
     {
+      /* Is there already pending work? */
+
+      work_cancel(qid, work);
+
       return work_qqueue(&g_usrwork, work, worker, arg, delay);
     }
   else
diff --git a/libs/libc/wqueue/work_signal.c b/libs/libc/wqueue/work_signal.c
deleted file mode 100644
index eae548b..0000000
--- a/libs/libc/wqueue/work_signal.c
+++ /dev/null
@@ -1,99 +0,0 @@
-/****************************************************************************
- * libs/libc/wqueue/work_signal.c
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The
- * ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
- * License for the specific language governing permissions and limitations
- * under the License.
- *
- ****************************************************************************/
-
-/****************************************************************************
- * Included Files
- ****************************************************************************/
-
-#include <nuttx/config.h>
-
-#include <signal.h>
-#include <errno.h>
-
-#include <nuttx/wqueue.h>
-
-#include "wqueue/wqueue.h"
-
-#if defined(CONFIG_LIB_USRWORK) && !defined(__KERNEL__)
-
-/****************************************************************************
- * Pre-processor Definitions
- ****************************************************************************/
-
-/****************************************************************************
- * Private Type Declarations
- ****************************************************************************/
-
-/****************************************************************************
- * Public Data
- ****************************************************************************/
-
-/****************************************************************************
- * Private Data
- ****************************************************************************/
-
-/****************************************************************************
- * Private Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Public Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Name: work_signal
- *
- * Description:
- *   Signal the worker thread to process the work queue now.  This function
- *   is used internally by the work logic but could also be used by the
- *   user to force an immediate re-assessment of pending work.
- *
- * Input Parameters:
- *   qid    - The work queue ID
- *
- * Returned Value:
- *   Zero on success, a negated errno on failure
- *
- ****************************************************************************/
-
-int work_signal(int qid)
-{
-  int ret;
-
-  if (qid == USRWORK)
-    {
-      /* Signal the worker thread */
-
-      ret = kill(g_usrwork.pid, SIGWORK);
-      if (ret < 0)
-        {
-          int errcode = get_errno();
-          ret = -errcode;
-        }
-    }
-  else
-    {
-      ret = -EINVAL;
-    }
-
-  return ret;
-}
-
-#endif /* CONFIG_LIB_USRWORK && !__KERNEL__ */
diff --git a/libs/libc/wqueue/work_usrthread.c b/libs/libc/wqueue/work_usrthread.c
index 0c71b8a..8c2b345 100644
--- a/libs/libc/wqueue/work_usrthread.c
+++ b/libs/libc/wqueue/work_usrthread.c
@@ -27,7 +27,6 @@
 #include <stdint.h>
 #include <unistd.h>
 #include <pthread.h>
-#include <signal.h>
 #include <sched.h>
 #include <errno.h>
 #include <assert.h>
@@ -45,26 +44,12 @@
  * Pre-processor Definitions
  ****************************************************************************/
 
-/* Use CLOCK_MONOTONIC if it is available.  CLOCK_REALTIME can cause bad
- * delays if the time is changed.
- */
-
-#ifdef CONFIG_CLOCK_MONOTONIC
-#  define WORK_CLOCK CLOCK_MONOTONIC
-#else
-#  define WORK_CLOCK CLOCK_REALTIME
-#endif
-
 #ifdef CONFIG_SYSTEM_TIME64
 #  define WORK_DELAY_MAX UINT64_MAX
 #else
 #  define WORK_DELAY_MAX UINT32_MAX
 #endif
 
-#ifndef MIN
-#  define MIN(a,b) ((a) < (b) ? (a) : (b))
-#endif
-
 /****************************************************************************
  * Private Type Declarations
  ****************************************************************************/
@@ -98,17 +83,12 @@ struct usr_wqueue_s g_usrwork;
  *
  ****************************************************************************/
 
-void work_process(FAR struct usr_wqueue_s *wqueue)
+static void work_process(FAR struct usr_wqueue_s *wqueue)
 {
   volatile FAR struct work_s *work;
-  sigset_t sigset;
-  sigset_t oldset;
-  worker_t  worker;
+  worker_t worker;
   FAR void *arg;
-  clock_t elapsed;
-  clock_t remaining;
-  clock_t stick;
-  clock_t ctick;
+  sclock_t elapsed;
   clock_t next;
   int ret;
 
@@ -125,15 +105,6 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
       return;
     }
 
-  /* Set up the signal mask */
-
-  sigemptyset(&sigset);
-  sigaddset(&sigset, SIGWORK);
-
-  /* Get the time that we started this polling cycle in clock ticks. */
-
-  stick = clock();
-
   /* And check each entry in the work queue.  Since we have locked the
    * work queue we know:  (1) we will not be suspended unless we do
    * so ourselves, and (2) there will be no changes to the work queue
@@ -142,19 +113,21 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
   work = (FAR struct work_s *)wqueue->q.head;
   while (work)
     {
-      /* Is this work ready?  It is ready if there is no delay or if
-       * the delay has elapsed. qtime is the time that the work was added
-       * to the work queue.  It will always be greater than or equal to
-       * zero.  Therefore a delay of zero will always execute immediately.
+      /* Is this work ready? It is ready if there is no delay or if
+       * the delay has elapsed.  is the time that the work was added
+       * to the work queue. Therefore a delay of equal or less than
+       * zero will always execute immediately.
        */
 
-      ctick   = clock();
-      elapsed = ctick - work->qtime;
-      if (elapsed >= work->delay)
+      elapsed = clock() - work->u.s.qtime;
+
+      /* Is this delay work ready? */
+
+      if (elapsed >= 0)
         {
           /* Remove the ready-to-execute work from the list */
 
-          dq_rem((struct dq_entry_s *)work, &wqueue->q);
+          sq_remfirst(&wqueue->q);
 
           /* Extract the work description from the entry (in case the work
            * instance by the re-used after it has been de-queued).
@@ -195,65 +168,26 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
 
                   return;
                 }
-
-              work = (FAR struct work_s *)wqueue->q.head;
             }
-          else
-            {
-              /* Canceled.. Just move to the next work in the list with
-               * the work queue still locked.
-               */
 
-              work = (FAR struct work_s *)work->dq.flink;
-            }
+          work = (FAR struct work_s *)wqueue->q.head;
         }
-      else /* elapsed < work->delay */
+      else
         {
-          /* This one is not ready.
-           *
-           * NOTE that elapsed is relative to the current time,
-           * not the time of beginning of this queue processing pass.
-           * So it may need an adjustment.
-           */
-
-          elapsed += (ctick - stick);
-          if (elapsed > work->delay)
-            {
-              /* The delay has expired while we are processing */
-
-              elapsed = work->delay;
-            }
-
-          /* Will it be ready before the next scheduled wakeup interval? */
-
-          remaining = work->delay - elapsed;
-          if (remaining < next)
-            {
-              /* Yes.. Then schedule to wake up when the work is ready */
-
-              next = remaining;
-            }
-
-          /* Then try the next in the list. */
-
-          work = (FAR struct work_s *)work->dq.flink;
+          next = work->u.s.qtime - clock();
+          break;
         }
     }
 
-  /* Unlock the work queue before waiting.  In order to assure that we do
-   * not lose the SIGWORK signal before waiting, we block the SIGWORK
-   * signals before unlocking the work queue.  That will cause in SIGWORK
-   * signals directed to the worker thread to pend.
-   */
+  /* Unlock the work queue before waiting. */
 
-  sigprocmask(SIG_BLOCK, &sigset, &oldset);
   _SEM_POST(&wqueue->lock);
 
   if (next == WORK_DELAY_MAX)
     {
-      /* Wait indefinitely until signaled with SIGWORK */
+      /* Wait indefinitely until work_queue has new items */
 
-      sigwaitinfo(&sigset, NULL);
+      _SEM_WAIT(&wqueue->wake);
     }
   else
     {
@@ -261,7 +195,7 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
       time_t sec;
 
       /* Wait awhile to check the work list.  We will wait here until
-       * either the time elapses or until we are awakened by a signal.
+       * either the time elapses or until we are awakened by a semaphore.
        * Interrupts will be re-enabled while we wait.
        */
 
@@ -269,10 +203,8 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
       rqtp.tv_sec  = sec;
       rqtp.tv_nsec = (next - (sec * 1000000)) * 1000;
 
-      sigtimedwait(&sigset, NULL, &rqtp);
+      _SEM_TIMEDWAIT(&wqueue->wake, &rqtp);
     }
-
-  sigprocmask(SIG_SETMASK, &oldset, NULL);
 }
 
 /****************************************************************************
@@ -339,53 +271,48 @@ static pthread_addr_t work_usrthread(pthread_addr_t arg)
 
 int work_usrstart(void)
 {
+  int ret;
+#ifndef CONFIG_BUILD_PROTECTED
+  pthread_t usrwork;
+  pthread_attr_t attr;
+  struct sched_param param;
+#endif
+
   /* Set up the work queue lock */
 
   _SEM_INIT(&g_usrwork.lock, 0, 1);
 
+  _SEM_INIT(&g_usrwork.wake, 0, 0);
+  _SEM_SETPROTOCOL(&g_usrwork.wake, SEM_PRIO_NONE);
+
+  /* Initialize the work queue */
+
+  sq_init(&g_usrwork.q);
+
 #ifdef CONFIG_BUILD_PROTECTED
 
   /* Start a user-mode worker thread for use by applications. */
 
-  g_usrwork.pid = task_create("uwork",
-                              CONFIG_LIB_USRWORKPRIORITY,
-                              CONFIG_LIB_USRWORKSTACKSIZE,
-                              (main_t)work_usrthread,
-                              (FAR char * const *)NULL);
-
-  DEBUGASSERT(g_usrwork.pid > 0);
-  if (g_usrwork.pid < 0)
+  ret = task_create("uwork",
+                    CONFIG_LIB_USRWORKPRIORITY,
+                    CONFIG_LIB_USRWORKSTACKSIZE,
+                    (main_t)work_usrthread,
+                    ((FAR char * const *)NULL));
+  if (ret < 0)
     {
       int errcode = get_errno();
       DEBUGASSERT(errcode > 0);
       return -errcode;
     }
 
-  return g_usrwork.pid;
+  return ret;
 #else
-  pthread_t usrwork;
-  pthread_attr_t attr;
-  struct sched_param param;
-  int ret;
-
   /* Start a user-mode worker thread for use by applications. */
 
   pthread_attr_init(&attr);
   pthread_attr_setstacksize(&attr, CONFIG_LIB_USRWORKSTACKSIZE);
 
-#ifdef CONFIG_SCHED_SPORADIC
-  /* Get the current sporadic scheduling parameters.  Those will not be
-   * modified.
-   */
-
-  ret = set_getparam(pid, &param);
-  if (ret < 0)
-    {
-      int errcode = get_errno();
-      return -errcode;
-    }
-#endif
-
+  pthread_attr_getschedparam(&attr, &param);
   param.sched_priority = CONFIG_LIB_USRWORKPRIORITY;
   pthread_attr_setschedparam(&attr, &param);
 
@@ -401,8 +328,7 @@ int work_usrstart(void)
 
   pthread_detach(usrwork);
 
-  g_usrwork.pid = (pid_t)usrwork;
-  return g_usrwork.pid;
+  return (pid_t)usrwork;
 #endif
 }
 
diff --git a/libs/libc/wqueue/wqueue.h b/libs/libc/wqueue/wqueue.h
index e84c229..4c03421 100644
--- a/libs/libc/wqueue/wqueue.h
+++ b/libs/libc/wqueue/wqueue.h
@@ -46,9 +46,9 @@
 
 struct usr_wqueue_s
 {
-  struct dq_queue_s q;      /* The queue of pending work */
+  struct sq_queue_s q;      /* The queue of pending work */
   sem_t             lock;   /* exclusive access to user-mode work queue */
-  pid_t             pid;    /* The task ID of the worker thread(s) */
+  sem_t             wake;   /* The wake-up semaphore of the  usrthread */
 };
 
 /****************************************************************************

[incubator-nuttx] 03/04: userspace/wqueue: move exclusive access lock to mqueue inside

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiaoxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nuttx.git

commit 00854f0f94704735301fdfe801a448e8a0e443d9
Author: Jiuzhu Dong <do...@xiaomi.com>
AuthorDate: Mon Jul 12 20:58:03 2021 +0800

    userspace/wqueue: move exclusive access lock to mqueue inside
    
    Change-Id: I885d5641bc81fedf698c241d4719cb3561700f17
    Signed-off-by: Jiuzhu Dong <do...@xiaomi.com>
---
 libs/libc/wqueue/Make.defs        |   1 -
 libs/libc/wqueue/work_cancel.c    |   4 +-
 libs/libc/wqueue/work_lock.c      | 105 --------------------------------------
 libs/libc/wqueue/work_queue.c     |   4 +-
 libs/libc/wqueue/work_usrthread.c |  25 +++------
 libs/libc/wqueue/wqueue.h         |  46 +----------------
 6 files changed, 12 insertions(+), 173 deletions(-)

diff --git a/libs/libc/wqueue/Make.defs b/libs/libc/wqueue/Make.defs
index aa5685b..72862bf 100644
--- a/libs/libc/wqueue/Make.defs
+++ b/libs/libc/wqueue/Make.defs
@@ -23,7 +23,6 @@ ifeq ($(CONFIG_LIB_USRWORK),y)
 # Add the work queue C files to the build
 
 CSRCS += work_usrthread.c work_queue.c work_cancel.c work_signal.c
-CSRCS += work_lock.c
 
 # Add the wqueue directory to the build
 
diff --git a/libs/libc/wqueue/work_cancel.c b/libs/libc/wqueue/work_cancel.c
index 6af60c1..54e084c 100644
--- a/libs/libc/wqueue/work_cancel.c
+++ b/libs/libc/wqueue/work_cancel.c
@@ -69,7 +69,7 @@ static int work_qcancel(FAR struct usr_wqueue_s *wqueue,
 
   /* Get exclusive access to the work queue */
 
-  while (work_lock() < 0);
+  while (_SEM_WAIT(&wqueue->lock) < 0);
 
   /* Cancelling the work is simply a matter of removing the work structure
    * from the work queue.  This must be done with interrupts disabled because
@@ -94,7 +94,7 @@ static int work_qcancel(FAR struct usr_wqueue_s *wqueue,
       ret = OK;
     }
 
-  work_unlock();
+  _SEM_POST(&wqueue->lock);
   return ret;
 }
 
diff --git a/libs/libc/wqueue/work_lock.c b/libs/libc/wqueue/work_lock.c
deleted file mode 100644
index 26e6e6e..0000000
--- a/libs/libc/wqueue/work_lock.c
+++ /dev/null
@@ -1,105 +0,0 @@
-/****************************************************************************
- * libs/libc/wqueue/work_lock.c
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The
- * ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
- * License for the specific language governing permissions and limitations
- * under the License.
- *
- ****************************************************************************/
-
-/****************************************************************************
- * Included Files
- ****************************************************************************/
-
-#include <nuttx/config.h>
-
-#include <pthread.h>
-#include <assert.h>
-#include <errno.h>
-
-#include <nuttx/semaphore.h>
-
-#include "wqueue/wqueue.h"
-
-#if defined(CONFIG_LIB_USRWORK) && !defined(__KERNEL__)
-
-/****************************************************************************
- * Public Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Name: work_lock
- *
- * Description:
- *   Lock the user-mode work queue.
- *
- * Input Parameters:
- *   None
- *
- * Returned Value:
- *   Zero (OK) on success, a negated errno on failure.  This error may be
- *   reported:
- *
- *   -EINTR - Wait was interrupted by a signal
- *
- ****************************************************************************/
-
-int work_lock(void)
-{
-  int ret;
-
-#ifdef CONFIG_BUILD_PROTECTED
-  ret = _SEM_WAIT(&g_usrsem);
-  if (ret < 0)
-    {
-      DEBUGASSERT(_SEM_ERRNO(ret) == EINTR ||
-                  _SEM_ERRNO(ret) == ECANCELED);
-      return -EINTR;
-    }
-#else
-  ret = pthread_mutex_lock(&g_usrmutex);
-  if (ret != 0)
-    {
-      DEBUGASSERT(ret == EINTR);
-      return -EINTR;
-    }
-#endif
-
-  return ret;
-}
-
-/****************************************************************************
- * Name: work_unlock
- *
- * Description:
- *   Unlock the user-mode work queue.
- *
- * Input Parameters:
- *   None
- *
- * Returned Value:
- *   None
- *
- ****************************************************************************/
-
-void work_unlock(void)
-{
-#ifdef CONFIG_BUILD_PROTECTED
-  _SEM_POST(&g_usrsem);
-#else
-  pthread_mutex_unlock(&g_usrmutex);
-#endif
-}
-
-#endif /* CONFIG_LIB_USRWORK && !__KERNEL__*/
diff --git a/libs/libc/wqueue/work_queue.c b/libs/libc/wqueue/work_queue.c
index 1be23a0..5f631f2 100644
--- a/libs/libc/wqueue/work_queue.c
+++ b/libs/libc/wqueue/work_queue.c
@@ -78,7 +78,7 @@ static int work_qqueue(FAR struct usr_wqueue_s *wqueue,
 
   /* Get exclusive access to the work queue */
 
-  while (work_lock() < 0);
+  while (_SEM_WAIT(&wqueue->lock) < 0);
 
   /* Is there already pending work? */
 
@@ -104,7 +104,7 @@ static int work_qqueue(FAR struct usr_wqueue_s *wqueue,
   dq_addlast((FAR dq_entry_t *)work, &wqueue->q);
   kill(wqueue->pid, SIGWORK);   /* Wake up the worker thread */
 
-  work_unlock();
+  _SEM_POST(&wqueue->lock);
   return OK;
 }
 
diff --git a/libs/libc/wqueue/work_usrthread.c b/libs/libc/wqueue/work_usrthread.c
index 466493c..0c71b8a 100644
--- a/libs/libc/wqueue/work_usrthread.c
+++ b/libs/libc/wqueue/work_usrthread.c
@@ -77,14 +77,6 @@
 
 struct usr_wqueue_s g_usrwork;
 
-/* This semaphore supports exclusive access to the user-mode work queue */
-
-#ifdef CONFIG_BUILD_PROTECTED
-sem_t g_usrsem;
-#else
-pthread_mutex_t g_usrmutex;
-#endif
-
 /****************************************************************************
  * Private Functions
  ****************************************************************************/
@@ -125,7 +117,7 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
    */
 
   next = WORK_DELAY_MAX;
-  ret = work_lock();
+  ret = _SEM_WAIT(&wqueue->lock);
   if (ret < 0)
     {
       /* Break out earlier if we were awakened by a signal */
@@ -188,7 +180,7 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
                * performed... we don't have any idea how long this will take!
                */
 
-              work_unlock();
+              _SEM_POST(&wqueue->lock);
               worker(arg);
 
               /* Now, unfortunately, since we unlocked the work queue we
@@ -196,7 +188,7 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
                * start back at the head of the list.
                */
 
-              ret = work_lock();
+              ret = _SEM_WAIT(&wqueue->lock);
               if (ret < 0)
                 {
                   /* Break out earlier if we were awakened by a signal */
@@ -255,7 +247,7 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
    */
 
   sigprocmask(SIG_BLOCK, &sigset, &oldset);
-  work_unlock();
+  _SEM_POST(&wqueue->lock);
 
   if (next == WORK_DELAY_MAX)
     {
@@ -347,10 +339,11 @@ static pthread_addr_t work_usrthread(pthread_addr_t arg)
 
 int work_usrstart(void)
 {
-#ifdef CONFIG_BUILD_PROTECTED
   /* Set up the work queue lock */
 
-  _SEM_INIT(&g_usrsem, 0, 1);
+  _SEM_INIT(&g_usrwork.lock, 0, 1);
+
+#ifdef CONFIG_BUILD_PROTECTED
 
   /* Start a user-mode worker thread for use by applications. */
 
@@ -375,10 +368,6 @@ int work_usrstart(void)
   struct sched_param param;
   int ret;
 
-  /* Set up the work queue lock */
-
-  pthread_mutex_init(&g_usrmutex, NULL);
-
   /* Start a user-mode worker thread for use by applications. */
 
   pthread_attr_init(&attr);
diff --git a/libs/libc/wqueue/wqueue.h b/libs/libc/wqueue/wqueue.h
index 45203fa..e84c229 100644
--- a/libs/libc/wqueue/wqueue.h
+++ b/libs/libc/wqueue/wqueue.h
@@ -47,6 +47,7 @@
 struct usr_wqueue_s
 {
   struct dq_queue_s q;      /* The queue of pending work */
+  sem_t             lock;   /* exclusive access to user-mode work queue */
   pid_t             pid;    /* The task ID of the worker thread(s) */
 };
 
@@ -58,54 +59,9 @@ struct usr_wqueue_s
 
 extern struct usr_wqueue_s g_usrwork;
 
-/* This semaphore/mutex supports exclusive access to the user-mode work
- * queue
- */
-
-#ifdef CONFIG_BUILD_PROTECTED
-extern sem_t g_usrsem;
-#else
-extern pthread_mutex_t g_usrmutex;
-#endif
-
 /****************************************************************************
  * Public Function Prototypes
  ****************************************************************************/
 
-/****************************************************************************
- * Name: work_lock
- *
- * Description:
- *   Lock the user-mode work queue.
- *
- * Input Parameters:
- *   None
- *
- * Returned Value:
- *   Zero (OK) on success, a negated errno on failure.  This error may be
- *   reported:
- *
- *   -EINTR - Wait was interrupted by a signal
- *
- ****************************************************************************/
-
-int work_lock(void);
-
-/****************************************************************************
- * Name: work_unlock
- *
- * Description:
- *   Unlock the user-mode work queue.
- *
- * Input Parameters:
- *   None
- *
- * Returned Value:
- *   None
- *
- ****************************************************************************/
-
-void work_unlock(void);
-
 #endif /* CONFIG_LIB_USRWORK && !__KERNEL__*/
 #endif /* __LIBC_WQUEUE_WQUEUE_H */

[incubator-nuttx] 01/04: sched/wqueue: merge kwork_lpthread.c and kwork_hpthread.c to kwork_thread.c

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiaoxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nuttx.git

commit a0c3a0923a17cacafecdceb4bd4522011ac435ad
Author: Jiuzhu Dong <do...@xiaomi.com>
AuthorDate: Fri Jun 25 14:24:45 2021 +0800

    sched/wqueue: merge kwork_lpthread.c and kwork_hpthread.c to kwork_thread.c
    
    Signed-off-by: Jiuzhu Dong <do...@xiaomi.com>
---
 sched/wqueue/Make.defs                            |  12 +-
 sched/wqueue/kwork_lpthread.c                     | 171 ----------------------
 sched/wqueue/kwork_notifier.c                     |   1 -
 sched/wqueue/{kwork_hpthread.c => kwork_thread.c} | 163 ++++++++++++++-------
 4 files changed, 114 insertions(+), 233 deletions(-)

diff --git a/sched/wqueue/Make.defs b/sched/wqueue/Make.defs
index 5e8bd86..bc31d53 100644
--- a/sched/wqueue/Make.defs
+++ b/sched/wqueue/Make.defs
@@ -23,21 +23,11 @@
 ifeq ($(CONFIG_SCHED_WORKQUEUE),y)
 
 CSRCS += kwork_queue.c kwork_process.c kwork_cancel.c kwork_signal.c
+CSRCS += kwork_thread.c
 
-# Add high priority work queue files
-
-ifeq ($(CONFIG_SCHED_HPWORK),y)
-CSRCS += kwork_hpthread.c
-endif
-
-# Add low priority work queue files
-
-ifeq ($(CONFIG_SCHED_LPWORK),y)
-CSRCS += kwork_lpthread.c
 ifeq ($(CONFIG_PRIORITY_INHERITANCE),y)
 CSRCS += kwork_inherit.c
 endif # CONFIG_PRIORITY_INHERITANCE
-endif # CONFIG_SCHED_LPWORK
 
 # Add work queue notifier support
 
diff --git a/sched/wqueue/kwork_lpthread.c b/sched/wqueue/kwork_lpthread.c
deleted file mode 100644
index 0cea4a3..0000000
--- a/sched/wqueue/kwork_lpthread.c
+++ /dev/null
@@ -1,171 +0,0 @@
-/****************************************************************************
- * sched/wqueue/kwork_lpthread.c
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The
- * ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
- * License for the specific language governing permissions and limitations
- * under the License.
- *
- ****************************************************************************/
-
-/****************************************************************************
- * Included Files
- ****************************************************************************/
-
-#include <nuttx/config.h>
-
-#include <unistd.h>
-#include <sched.h>
-#include <string.h>
-#include <assert.h>
-#include <errno.h>
-#include <queue.h>
-#include <debug.h>
-
-#include <nuttx/wqueue.h>
-#include <nuttx/kthread.h>
-#include <nuttx/kmalloc.h>
-#include <nuttx/clock.h>
-
-#include "wqueue/wqueue.h"
-
-#ifdef CONFIG_SCHED_LPWORK
-
-/****************************************************************************
- * Public Data
- ****************************************************************************/
-
-/* The state of the kernel mode, low priority work queue(s). */
-
-struct lp_wqueue_s g_lpwork;
-
-/****************************************************************************
- * Private Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Name: work_lpthread
- *
- * Description:
- *   These are the worker thread(s) that performs the actions placed on the
- *   low priority work queue.
- *
- *   These, along with the higher priority worker thread are the kernel mode
- *   work queues (also build in the flat build).
- *
- *   All kernel mode worker threads are started by the OS during normal
- *   bring up.  This entry point is referenced by OS internally and should
- *   not be accessed by application logic.
- *
- * Input Parameters:
- *   argc, argv (not used)
- *
- * Returned Value:
- *   Does not return
- *
- ****************************************************************************/
-
-static int work_lpthread(int argc, char *argv[])
-{
-  int wndx = 0;
-#if CONFIG_SCHED_LPNTHREADS > 1
-  pid_t me = getpid();
-  int i;
-
-  /* Find out thread index by search the workers in g_lpwork */
-
-  for (wndx = 0, i = 0; i < CONFIG_SCHED_LPNTHREADS; i++)
-    {
-      if (g_lpwork.worker[i].pid == me)
-        {
-          wndx = i;
-          break;
-        }
-    }
-
-  DEBUGASSERT(i < CONFIG_SCHED_LPNTHREADS);
-#endif
-
-  /* Loop forever */
-
-  for (; ; )
-    {
-      /* Then process queued work.  work_process will not return until:
-       * (1) there is no further work in the work queue, and (2) signal is
-       * triggered, or delayed work expires.
-       */
-
-      work_process((FAR struct kwork_wqueue_s *)&g_lpwork, wndx);
-    }
-
-  return OK; /* To keep some compilers happy */
-}
-
-/****************************************************************************
- * Public Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Name: work_start_lowpri
- *
- * Description:
- *   Start the low-priority, kernel-mode worker thread(s)
- *
- * Input Parameters:
- *   None
- *
- * Returned Value:
- *   The task ID of the worker thread is returned on success.  A negated
- *   errno value is returned on failure.
- *
- ****************************************************************************/
-
-int work_start_lowpri(void)
-{
-  pid_t pid;
-  int wndx;
-
-  /* Don't permit any of the threads to run until we have fully initialized
-   * g_lpwork.
-   */
-
-  sched_lock();
-
-  /* Start the low-priority, kernel mode worker thread(s) */
-
-  sinfo("Starting low-priority kernel worker thread(s)\n");
-
-  for (wndx = 0; wndx < CONFIG_SCHED_LPNTHREADS; wndx++)
-    {
-      pid = kthread_create(LPWORKNAME, CONFIG_SCHED_LPWORKPRIORITY,
-                           CONFIG_SCHED_LPWORKSTACKSIZE,
-                           (main_t)work_lpthread,
-                           (FAR char * const *)NULL);
-
-      DEBUGASSERT(pid > 0);
-      if (pid < 0)
-        {
-          serr("ERROR: kthread_create %d failed: %d\n", wndx, (int)pid);
-          sched_unlock();
-          return (int)pid;
-        }
-
-      g_lpwork.worker[wndx].pid  = pid;
-      g_lpwork.worker[wndx].busy = true;
-    }
-
-  sched_unlock();
-  return g_lpwork.worker[0].pid;
-}
-
-#endif /* CONFIG_SCHED_LPWORK */
diff --git a/sched/wqueue/kwork_notifier.c b/sched/wqueue/kwork_notifier.c
index 012e3bb..09cb1a6 100644
--- a/sched/wqueue/kwork_notifier.c
+++ b/sched/wqueue/kwork_notifier.c
@@ -33,7 +33,6 @@
 #include <assert.h>
 
 #include <nuttx/kmalloc.h>
-#include <nuttx/semaphore.h>
 #include <nuttx/wqueue.h>
 
 #include "wqueue/wqueue.h"
diff --git a/sched/wqueue/kwork_hpthread.c b/sched/wqueue/kwork_thread.c
similarity index 53%
rename from sched/wqueue/kwork_hpthread.c
rename to sched/wqueue/kwork_thread.c
index e44efe8..3c3b7eb 100644
--- a/sched/wqueue/kwork_hpthread.c
+++ b/sched/wqueue/kwork_thread.c
@@ -1,5 +1,5 @@
 /****************************************************************************
- * sched/wqueue/kwork_hpthread.c
+ * sched/wqueue/kwork_thread.c
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -26,7 +26,9 @@
 
 #include <unistd.h>
 #include <sched.h>
+#include <stdio.h>
 #include <string.h>
+#include <stdlib.h>
 #include <errno.h>
 #include <assert.h>
 #include <queue.h>
@@ -34,27 +36,33 @@
 
 #include <nuttx/wqueue.h>
 #include <nuttx/kthread.h>
-#include <nuttx/kmalloc.h>
-#include <nuttx/clock.h>
 
 #include "wqueue/wqueue.h"
 
-#ifdef CONFIG_SCHED_HPWORK
+#if defined(CONFIG_SCHED_WORKQUEUE)
 
 /****************************************************************************
  * Public Data
  ****************************************************************************/
 
+#if defined(CONFIG_SCHED_HPWORK)
 /* The state of the kernel mode, high priority work queue(s). */
 
 struct hp_wqueue_s g_hpwork;
+#endif /* CONFIG_SCHED_HPWORK */
+
+#if defined(CONFIG_SCHED_LPWORK)
+/* The state of the kernel mode, low priority work queue(s). */
+
+struct lp_wqueue_s g_lpwork;
+#endif /* CONFIG_SCHED_LPWORK */
 
 /****************************************************************************
  * Private Functions
  ****************************************************************************/
 
 /****************************************************************************
- * Name: work_hpthread
+ * Name: work_thread
  *
  * Description:
  *   These are the worker threads that performs the actions placed on the
@@ -75,26 +83,14 @@ struct hp_wqueue_s g_hpwork;
  *
  ****************************************************************************/
 
-static int work_hpthread(int argc, char *argv[])
+static int work_thread(int argc, char *argv[])
 {
-  int wndx = 0;
-#if CONFIG_SCHED_HPNTHREADS > 1
-  pid_t me = getpid();
-  int i;
-
-  /* Find out thread index by search the workers in g_hpwork */
-
-  for (wndx = 0, i = 0; i < CONFIG_SCHED_HPNTHREADS; i++)
-    {
-      if (g_hpwork.worker[i].pid == me)
-        {
-          wndx = i;
-          break;
-        }
-    }
+  FAR struct kwork_wqueue_s *queue;
+  int wndx;
 
-  DEBUGASSERT(i < CONFIG_SCHED_HPNTHREADS);
-#endif
+  queue  = (FAR struct kwork_wqueue_s *)
+           ((uintptr_t)strtoul(argv[1], NULL, 0));
+  wndx   = atoi(argv[2]);
 
   /* Loop forever */
 
@@ -105,67 +101,134 @@ static int work_hpthread(int argc, char *argv[])
        * triggered, or delayed work expires.
        */
 
-      work_process((FAR struct kwork_wqueue_s *)&g_hpwork, wndx);
+      work_process(queue, wndx);
     }
 
   return OK; /* To keep some compilers happy */
 }
 
 /****************************************************************************
- * Public Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Name: work_start_highpri
+ * Name: work_thread_create
  *
  * Description:
- *   Start the high-priority, kernel-mode worker thread(s)
+ *   This function creates and activates a work thread task with kernel-
+ *   mode privileges.
  *
  * Input Parameters:
- *   None
+ *   name       - Name of the new task
+ *   priority   - Priority of the new task
+ *   stack_size - size (in bytes) of the stack needed
+ *   nthread    - Number of work thread should be created
+ *   wqueue     - Work queue instance
  *
  * Returned Value:
- *   The task ID of the worker thread is returned on success.  A negated
- *   errno value is returned on failure.
+ *   A negated errno value is returned on failure.
  *
  ****************************************************************************/
 
-int work_start_highpri(void)
+static int work_thread_create(FAR const char *name, int priority,
+                              int stack_size, int nthread,
+                              FAR struct kwork_wqueue_s *wqueue)
 {
-  pid_t pid;
+  FAR char *argv[3];
+  char args[2][16];
   int wndx;
+  int pid;
+
+  snprintf(args[0], 16, "0x%" PRIxPTR, (uintptr_t)wqueue);
+  argv[0] = args[0];
+  argv[2] = NULL;
 
   /* Don't permit any of the threads to run until we have fully initialized
-   * g_hpwork.
+   * g_hpwork and g_lpwork.
    */
 
   sched_lock();
 
-  /* Start the high-priority, kernel mode worker thread(s) */
-
-  sinfo("Starting high-priority kernel worker thread(s)\n");
-
-  for (wndx = 0; wndx < CONFIG_SCHED_HPNTHREADS; wndx++)
+  for (wndx = 0; wndx < nthread; wndx++)
     {
-      pid = kthread_create(HPWORKNAME, CONFIG_SCHED_HPWORKPRIORITY,
-                           CONFIG_SCHED_HPWORKSTACKSIZE,
-                           (main_t)work_hpthread,
-                           (FAR char * const *)NULL);
+      snprintf(args[1], 16, "%d", wndx);
+      argv[1] = args[1];
+
+      pid = kthread_create(name, priority, stack_size,
+                           (main_t)work_thread, argv);
 
       DEBUGASSERT(pid > 0);
       if (pid < 0)
         {
-          serr("ERROR: kthread_create %d failed: %d\n", wndx, (int)pid);
+          serr("ERROR: work_thread_create %d failed: %d\n", wndx, pid);
           sched_unlock();
-          return (int)pid;
+          return pid;
         }
 
-      g_hpwork.worker[wndx].pid  = pid;
-      g_hpwork.worker[wndx].busy = true;
+#ifdef CONFIG_PRIORITY_INHERITANCE
+      wqueue->worker[wndx].pid  = pid;
+#endif
+      wqueue->worker[wndx].busy = true;
     }
 
   sched_unlock();
-  return g_hpwork.worker[0].pid;
+  return OK;
 }
 
+/****************************************************************************
+ * Public Functions
+ ****************************************************************************/
+
+/****************************************************************************
+ * Name: work_start_highpri
+ *
+ * Description:
+ *   Start the high-priority, kernel-mode worker thread(s)
+ *
+ * Input Parameters:
+ *   None
+ *
+ * Returned Value:
+ *   A negated errno value is returned on failure.
+ *
+ ****************************************************************************/
+
+#if defined(CONFIG_SCHED_HPWORK)
+int work_start_highpri(void)
+{
+  /* Start the high-priority, kernel mode worker thread(s) */
+
+  sinfo("Starting high-priority kernel worker thread(s)\n");
+
+  return work_thread_create(HPWORKNAME, CONFIG_SCHED_HPWORKPRIORITY,
+                            CONFIG_SCHED_HPWORKSTACKSIZE,
+                            CONFIG_SCHED_HPNTHREADS,
+                            (FAR struct kwork_wqueue_s *)&g_hpwork);
+}
 #endif /* CONFIG_SCHED_HPWORK */
+
+/****************************************************************************
+ * Name: work_start_lowpri
+ *
+ * Description:
+ *   Start the low-priority, kernel-mode worker thread(s)
+ *
+ * Input Parameters:
+ *   None
+ *
+ * Returned Value:
+ *   A negated errno value is returned on failure.
+ *
+ ****************************************************************************/
+
+#if defined(CONFIG_SCHED_LPWORK)
+int work_start_lowpri(void)
+{
+  /* Start the low-priority, kernel mode worker thread(s) */
+
+  sinfo("Starting low-priority kernel worker thread(s)\n");
+
+  return work_thread_create(LPWORKNAME, CONFIG_SCHED_LPWORKPRIORITY,
+                            CONFIG_SCHED_LPWORKSTACKSIZE,
+                            CONFIG_SCHED_LPNTHREADS,
+                            (FAR struct kwork_wqueue_s *)&g_lpwork);
+}
+#endif /* CONFIG_SCHED_LPWORK */
+
+#endif /* CONFIG_SCHED_WORKQUEUE */

[incubator-nuttx] 02/04: work_queue: schedule the work queue using the timer mechanism

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiaoxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nuttx.git

commit 855c78bb9dc48d4f2655257fb7ee01ff9c456cbc
Author: Jiuzhu Dong <do...@xiaomi.com>
AuthorDate: Sat Jun 19 17:29:30 2021 +0800

    work_queue: schedule the work queue using the timer mechanism
    
    Signed-off-by: Jiuzhu Dong <do...@xiaomi.com>
---
 include/nuttx/wqueue.h       |  35 ++----
 sched/wqueue/Make.defs       |   3 +-
 sched/wqueue/kwork_cancel.c  |  17 +--
 sched/wqueue/kwork_process.c | 271 -------------------------------------------
 sched/wqueue/kwork_queue.c   | 130 ++++++++++-----------
 sched/wqueue/kwork_signal.c  | 111 ------------------
 sched/wqueue/kwork_thread.c  | 103 ++++++++++++----
 sched/wqueue/wqueue.h        |  34 ++----
 8 files changed, 174 insertions(+), 530 deletions(-)

diff --git a/include/nuttx/wqueue.h b/include/nuttx/wqueue.h
index e78c46c..5a2d221 100644
--- a/include/nuttx/wqueue.h
+++ b/include/nuttx/wqueue.h
@@ -32,6 +32,7 @@
 #include <queue.h>
 
 #include <nuttx/clock.h>
+#include <nuttx/wdog.h>
 
 /****************************************************************************
  * Pre-processor Definitions
@@ -244,11 +245,17 @@ typedef CODE void (*worker_t)(FAR void *arg);
 
 struct work_s
 {
-  struct dq_entry_s dq;  /* Implements a doubly linked list */
-  worker_t  worker;      /* Work callback */
-  FAR void *arg;         /* Callback argument */
-  clock_t qtime;         /* Time work queued */
-  clock_t delay;         /* Delay until work performed */
+  union
+  {
+    struct
+    {
+      struct sq_entry_s sq; /* Implements a single linked list */
+      clock_t qtime;        /* Time work queued */
+    } s;
+    struct wdog_s timer;    /* Delay expiry timer */
+  } u;
+  worker_t  worker;         /* Work callback */
+  FAR void *arg;            /* Callback argument */
 };
 
 /* This is an enumeration of the various events that may be
@@ -375,24 +382,6 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
 int work_cancel(int qid, FAR struct work_s *work);
 
 /****************************************************************************
- * Name: work_signal
- *
- * Description:
- *   Signal the worker thread to process the work queue now.  This function
- *   is used internally by the work logic but could also be used by the
- *   user to force an immediate re-assessment of pending work.
- *
- * Input Parameters:
- *   qid    - The work queue ID
- *
- * Returned Value:
- *   Zero on success, a negated errno on failure
- *
- ****************************************************************************/
-
-int work_signal(int qid);
-
-/****************************************************************************
  * Name: work_available
  *
  * Description:
diff --git a/sched/wqueue/Make.defs b/sched/wqueue/Make.defs
index bc31d53..52fc34b 100644
--- a/sched/wqueue/Make.defs
+++ b/sched/wqueue/Make.defs
@@ -22,8 +22,7 @@
 
 ifeq ($(CONFIG_SCHED_WORKQUEUE),y)
 
-CSRCS += kwork_queue.c kwork_process.c kwork_cancel.c kwork_signal.c
-CSRCS += kwork_thread.c
+CSRCS += kwork_queue.c kwork_thread.c kwork_cancel.c
 
 ifeq ($(CONFIG_PRIORITY_INHERITANCE),y)
 CSRCS += kwork_inherit.c
diff --git a/sched/wqueue/kwork_cancel.c b/sched/wqueue/kwork_cancel.c
index 69886b9..b5936b5 100644
--- a/sched/wqueue/kwork_cancel.c
+++ b/sched/wqueue/kwork_cancel.c
@@ -77,18 +77,19 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue,
   flags = enter_critical_section();
   if (work->worker != NULL)
     {
-      /* A little test of the integrity of the work queue */
-
-      DEBUGASSERT(work->dq.flink != NULL ||
-                  (FAR dq_entry_t *)work == wqueue->q.tail);
-      DEBUGASSERT(work->dq.blink != NULL ||
-                  (FAR dq_entry_t *)work == wqueue->q.head);
-
       /* Remove the entry from the work queue and make sure that it is
        * marked as available (i.e., the worker field is nullified).
        */
 
-      dq_rem((FAR dq_entry_t *)work, &wqueue->q);
+      if (WDOG_ISACTIVE(&work->u.timer))
+        {
+          wd_cancel(&work->u.timer);
+        }
+      else
+        {
+          sq_rem((FAR sq_entry_t *)work, &wqueue->q);
+        }
+
       work->worker = NULL;
       ret = OK;
     }
diff --git a/sched/wqueue/kwork_process.c b/sched/wqueue/kwork_process.c
deleted file mode 100644
index 9bec97e..0000000
--- a/sched/wqueue/kwork_process.c
+++ /dev/null
@@ -1,271 +0,0 @@
-/****************************************************************************
- * sched/wqueue/kwork_process.c
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The
- * ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
- * License for the specific language governing permissions and limitations
- * under the License.
- *
- ****************************************************************************/
-
-/****************************************************************************
- * Included Files
- ****************************************************************************/
-
-#include <nuttx/config.h>
-
-#include <debug.h>
-#include <stdint.h>
-#include <unistd.h>
-#include <signal.h>
-#include <assert.h>
-#include <queue.h>
-
-#include <nuttx/irq.h>
-#include <nuttx/clock.h>
-#include <nuttx/signal.h>
-#include <nuttx/wqueue.h>
-
-#include "wqueue/wqueue.h"
-
-#ifdef CONFIG_SCHED_WORKQUEUE
-
-/****************************************************************************
- * Pre-processor Definitions
- ****************************************************************************/
-
-/* Use CLOCK_MONOTONIC if it is available.  CLOCK_REALTIME can cause bad
- * delays if the time is changed.
- */
-
-#ifdef CONFIG_CLOCK_MONOTONIC
-#  define WORK_CLOCK CLOCK_MONOTONIC
-#else
-#  define WORK_CLOCK CLOCK_REALTIME
-#endif
-
-#ifdef CONFIG_SYSTEM_TIME64
-#  define WORK_DELAY_MAX UINT64_MAX
-#else
-#  define WORK_DELAY_MAX UINT32_MAX
-#endif
-
-#ifndef MIN
-#  define MIN(a,b) ((a) < (b) ? (a) : (b))
-#endif
-
-#ifndef CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE
-#  define CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE 0
-#endif
-
-#if CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE > 0
-#  define CALL_WORKER(worker, arg) \
-     do \
-       { \
-         uint32_t start; \
-         uint32_t elapsed; \
-         start = up_critmon_gettime(); \
-         worker(arg); \
-         elapsed = up_critmon_gettime() - start; \
-         if (elapsed > CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE) \
-           { \
-             serr("WORKER %p execute too long %"PRIu32"\n", \
-                   worker, elapsed); \
-           } \
-       } \
-     while (0)
-#else
-#  define CALL_WORKER(worker, arg) worker(arg)
-#endif
-
-/****************************************************************************
- * Public Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Name: work_process
- *
- * Description:
- *   This is the logic that performs actions placed on any work list.  This
- *   logic is the common underlying logic to all work queues.  This logic is
- *   part of the internal implementation of each work queue; it should not
- *   be called from application level logic.
- *
- * Input Parameters:
- *   wqueue - Describes the work queue to be processed
- *
- * Returned Value:
- *   None
- *
- ****************************************************************************/
-
-void work_process(FAR struct kwork_wqueue_s *wqueue, int wndx)
-{
-  volatile FAR struct work_s *work;
-  worker_t  worker;
-  irqstate_t flags;
-  FAR void *arg;
-  clock_t elapsed;
-  clock_t remaining;
-  clock_t stick;
-  clock_t ctick;
-  clock_t next;
-
-  /* Then process queued work.  We need to keep interrupts disabled while
-   * we process items in the work list.
-   */
-
-  next  = WORK_DELAY_MAX;
-  flags = enter_critical_section();
-
-  /* Get the time that we started processing the queue in clock ticks. */
-
-  stick = clock_systime_ticks();
-
-  /* And check each entry in the work queue.  Since we have disabled
-   * interrupts we know:  (1) we will not be suspended unless we do
-   * so ourselves, and (2) there will be no changes to the work queue
-   */
-
-  work = (FAR struct work_s *)wqueue->q.head;
-  while (work != NULL)
-    {
-      /* Is this work ready?  It is ready if there is no delay or if
-       * the delay has elapsed. qtime is the time that the work was added
-       * to the work queue.  It will always be greater than or equal to
-       * zero.  Therefore a delay of zero will always execute immediately.
-       */
-
-      ctick   = clock_systime_ticks();
-      elapsed = ctick - work->qtime;
-      if (elapsed >= work->delay)
-        {
-          /* Remove the ready-to-execute work from the list */
-
-          dq_rem((struct dq_entry_s *)work, &wqueue->q);
-
-          /* Extract the work description from the entry (in case the work
-           * instance by the re-used after it has been de-queued).
-           */
-
-          worker = work->worker;
-
-          /* Check for a race condition where the work may be nullified
-           * before it is removed from the queue.
-           */
-
-          if (worker != NULL)
-            {
-              /* Extract the work argument (before re-enabling interrupts) */
-
-              arg = work->arg;
-
-              /* Mark the work as no longer being queued */
-
-              work->worker = NULL;
-
-              /* Do the work.  Re-enable interrupts while the work is being
-               * performed... we don't have any idea how long this will take!
-               */
-
-              leave_critical_section(flags);
-              CALL_WORKER(worker, arg);
-
-              /* Now, unfortunately, since we re-enabled interrupts we don't
-               * know the state of the work list and we will have to start
-               * back at the head of the list.
-               */
-
-              flags = enter_critical_section();
-              work  = (FAR struct work_s *)wqueue->q.head;
-            }
-          else
-            {
-              /* Cancelled.. Just move to the next work in the list with
-               * interrupts still disabled.
-               */
-
-              work = (FAR struct work_s *)work->dq.flink;
-            }
-        }
-      else /* elapsed < work->delay */
-        {
-          /* This one is not ready.
-           *
-           * NOTE that elapsed is relative to the current time,
-           * not the time of beginning of this queue processing pass.
-           * So it may need an adjustment.
-           */
-
-          elapsed += (ctick - stick);
-          if (elapsed > work->delay)
-            {
-              /* The delay has expired while we are processing */
-
-              elapsed = work->delay;
-            }
-
-          /* Will it be ready before the next scheduled wakeup interval? */
-
-          remaining = work->delay - elapsed;
-          if (remaining < next)
-            {
-              /* Yes.. Then schedule to wake up when the work is ready */
-
-              next = remaining;
-            }
-
-          /* Then try the next in the list. */
-
-          work = (FAR struct work_s *)work->dq.flink;
-        }
-    }
-
-  /* When multiple worker threads are created for this work queue, only
-   * thread 0 (wndx = 0) will monitor the unexpired works.
-   *
-   * Other worker threads (wndx > 0) just process no-delay or expired
-   * works, then sleep. The unexpired works are left in the queue. They
-   * will be handled by thread 0 when it finishes current work and iterate
-   * over the queue again.
-   */
-
-  if (wndx > 0 || next == WORK_DELAY_MAX)
-    {
-      sigset_t set;
-
-      /* Wait indefinitely until signalled with SIGWORK */
-
-      sigemptyset(&set);
-      nxsig_addset(&set, SIGWORK);
-
-      wqueue->worker[wndx].busy = false;
-      DEBUGVERIFY(nxsig_waitinfo(&set, NULL));
-      wqueue->worker[wndx].busy = true;
-    }
-  else
-    {
-      /* Wait a while to check the work list.  We will wait here until
-       * either the time elapses or until we are awakened by a signal.
-       * Interrupts will be re-enabled while we wait.
-       */
-
-      wqueue->worker[wndx].busy = false;
-      nxsig_usleep(next * USEC_PER_TICK);
-      wqueue->worker[wndx].busy = true;
-    }
-
-  leave_critical_section(flags);
-}
-
-#endif /* CONFIG_SCHED_WORKQUEUE */
diff --git a/sched/wqueue/kwork_queue.c b/sched/wqueue/kwork_queue.c
index 17fedb2..1b868f1 100644
--- a/sched/wqueue/kwork_queue.c
+++ b/sched/wqueue/kwork_queue.c
@@ -43,73 +43,32 @@
  ****************************************************************************/
 
 /****************************************************************************
- * Name: work_qqueue
- *
- * Description:
- *   Queue work to be performed at a later time.  All queued work will be
- *   performed on the worker thread of execution (not the caller's).
- *
- *   The work structure is allocated by caller, but completely managed by
- *   the work queue logic.  The caller should never modify the contents of
- *   the work queue structure; the caller should not call work_qqueue()
- *   again until either (1) the previous work has been performed and removed
- *   from the queue, or (2) work_cancel() has been called to cancel the work
- *   and remove it from the work queue.
- *
- * Input Parameters:
- *   qid    - The work queue ID (index)
- *   work   - The work structure to queue
- *   worker - The worker callback to be invoked.  The callback will be
- *            invoked on the worker thread of execution.
- *   arg    - The argument that will be passed to the worker callback when
- *            int is invoked.
- *   delay  - Delay (in clock ticks) from the time queue until the worker
- *            is invoked. Zero means to perform the work immediately.
- *
- * Returned Value:
- *   None
- *
+ * Name: hp_work_timer_expiry
  ****************************************************************************/
 
-static void work_qqueue(FAR struct kwork_wqueue_s *wqueue,
-                        FAR struct work_s *work, worker_t worker,
-                        FAR void *arg, clock_t delay)
+#ifdef CONFIG_SCHED_HPWORK
+static void hp_work_timer_expiry(wdparm_t arg)
 {
-  irqstate_t flags;
-
-  DEBUGASSERT(work != NULL && worker != NULL);
-
-  /* Interrupts are disabled so that this logic can be called from with
-   * task logic or ifrom nterrupt handling logic.
-   */
-
-  flags = enter_critical_section();
-
-  /* Is there already pending work? */
-
-  if (work->worker != NULL)
-    {
-      /* Remove the entry from the work queue.  It will be requeued at the
-       * end of the work queue.
-       */
-
-      dq_rem((FAR dq_entry_t *)work, &wqueue->q);
-    }
-
-  /* Initialize the work structure. */
-
-  work->worker = worker;           /* Work callback. non-NULL means queued */
-  work->arg    = arg;              /* Callback argument */
-  work->delay  = delay;            /* Delay until work performed */
-
-  /* Now, time-tag that entry and put it in the work queue */
-
-  work->qtime  = clock_systime_ticks(); /* Time work queued */
+  irqstate_t flags = enter_critical_section();
+  sq_addlast((FAR sq_entry_t *)arg, &g_hpwork.q);
+  nxsem_post(&g_hpwork.sem);
+  leave_critical_section(flags);
+}
+#endif
 
-  dq_addlast((FAR dq_entry_t *)work, &wqueue->q);
+/****************************************************************************
+ * Name: lp_work_timer_expiry
+ ****************************************************************************/
 
+#ifdef CONFIG_SCHED_LPWORK
+static void lp_work_timer_expiry(wdparm_t arg)
+{
+  irqstate_t flags = enter_critical_section();
+  sq_addlast((FAR sq_entry_t *)arg, &g_lpwork.q);
+  nxsem_post(&g_lpwork.sem);
   leave_critical_section(flags);
 }
+#endif
 
 /****************************************************************************
  * Public Functions
@@ -148,6 +107,23 @@ static void work_qqueue(FAR struct kwork_wqueue_s *wqueue,
 int work_queue(int qid, FAR struct work_s *work, worker_t worker,
                FAR void *arg, clock_t delay)
 {
+  irqstate_t flags;
+
+  /* Remove the entry from the timer and work queue. */
+
+  work_cancel(qid, work);
+
+  /* Interrupts are disabled so that this logic can be called from with
+   * task logic or from interrupt handling logic.
+   */
+
+  flags = enter_critical_section();
+
+  /* Initialize the work structure. */
+
+  work->worker = worker;           /* Work callback. non-NULL means queued */
+  work->arg = arg;                 /* Callback argument */
+
   /* Queue the new work */
 
 #ifdef CONFIG_SCHED_HPWORK
@@ -155,9 +131,16 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
     {
       /* Queue high priority work */
 
-      work_qqueue((FAR struct kwork_wqueue_s *)&g_hpwork, work, worker,
-                  arg, delay);
-      return work_signal(HPWORK);
+      if (!delay)
+        {
+          sq_addlast((FAR sq_entry_t *)work, &g_hpwork.q);
+          nxsem_post(&g_hpwork.sem);
+        }
+      else
+        {
+          wd_start(&work->u.timer, delay, hp_work_timer_expiry,
+                   (wdparm_t)work);
+        }
     }
   else
 #endif
@@ -166,15 +149,22 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
     {
       /* Queue low priority work */
 
-      work_qqueue((FAR struct kwork_wqueue_s *)&g_lpwork, work, worker,
-                  arg, delay);
-      return work_signal(LPWORK);
+      if (!delay)
+        {
+          sq_addlast((FAR sq_entry_t *)work, &g_lpwork.q);
+          nxsem_post(&g_lpwork.sem);
+        }
+      else
+        {
+          wd_start(&work->u.timer, delay, lp_work_timer_expiry,
+                   (wdparm_t)work);
+        }
     }
-  else
 #endif
-    {
-      return -EINVAL;
-    }
+
+  leave_critical_section(flags);
+
+  return OK;
 }
 
 #endif /* CONFIG_SCHED_WORKQUEUE */
diff --git a/sched/wqueue/kwork_signal.c b/sched/wqueue/kwork_signal.c
deleted file mode 100644
index 0f2084f..0000000
--- a/sched/wqueue/kwork_signal.c
+++ /dev/null
@@ -1,111 +0,0 @@
-/****************************************************************************
- * sched/wqueue/kwork_signal.c
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The
- * ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
- * License for the specific language governing permissions and limitations
- * under the License.
- *
- ****************************************************************************/
-
-/****************************************************************************
- * Included Files
- ****************************************************************************/
-
-#include <nuttx/config.h>
-
-#include <signal.h>
-#include <errno.h>
-
-#include <nuttx/wqueue.h>
-#include <nuttx/signal.h>
-
-#include "wqueue/wqueue.h"
-
-#ifdef CONFIG_SCHED_WORKQUEUE
-
-/****************************************************************************
- * Public Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Name: work_signal
- *
- * Description:
- *   Signal the worker thread to process the work queue now.  This function
- *   is used internally by the work logic but could also be used by the
- *   user to force an immediate re-assessment of pending work.
- *
- * Input Parameters:
- *   qid    - The work queue ID
- *
- * Returned Value:
- *   Zero (OK) on success, a negated errno value on failure
- *
- ****************************************************************************/
-
-int work_signal(int qid)
-{
-  FAR struct kwork_wqueue_s *work;
-  int threads;
-  int i;
-
-  /* Get the process ID of the worker thread */
-
-#ifdef CONFIG_SCHED_HPWORK
-  if (qid == HPWORK)
-    {
-      work = (FAR struct kwork_wqueue_s *)&g_hpwork;
-      threads = CONFIG_SCHED_HPNTHREADS;
-    }
-  else
-#endif
-#ifdef CONFIG_SCHED_LPWORK
-  if (qid == LPWORK)
-    {
-      work = (FAR struct kwork_wqueue_s *)&g_lpwork;
-      threads = CONFIG_SCHED_LPNTHREADS;
-    }
-  else
-#endif
-    {
-      return -EINVAL;
-    }
-
-  /* Find an IDLE worker thread */
-
-  for (i = 0; i < threads; i++)
-    {
-      /* Is this worker thread busy? */
-
-      if (!work->worker[i].busy)
-        {
-          /* No.. select this thread */
-
-          break;
-        }
-    }
-
-  /* If all of the IDLE threads are busy, then just return successfully */
-
-  if (i >= threads)
-    {
-      return OK;
-    }
-
-  /* Otherwise, signal the first IDLE thread found */
-
-  return nxsig_kill(work->worker[i].pid, SIGWORK);
-}
-
-#endif /* CONFIG_SCHED_WORKQUEUE */
diff --git a/sched/wqueue/kwork_thread.c b/sched/wqueue/kwork_thread.c
index 3c3b7eb..2e48399 100644
--- a/sched/wqueue/kwork_thread.c
+++ b/sched/wqueue/kwork_thread.c
@@ -36,12 +36,37 @@
 
 #include <nuttx/wqueue.h>
 #include <nuttx/kthread.h>
+#include <nuttx/semaphore.h>
 
 #include "wqueue/wqueue.h"
 
 #if defined(CONFIG_SCHED_WORKQUEUE)
 
 /****************************************************************************
+ * Pre-processor Definitions
+ ****************************************************************************/
+
+#if defined(CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE) && CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE > 0
+#  define CALL_WORKER(worker, arg) \
+     do \
+       { \
+         uint32_t start; \
+         uint32_t elapsed; \
+         start = up_critmon_gettime(); \
+         worker(arg); \
+         elapsed = up_critmon_gettime() - start; \
+         if (elapsed > CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE) \
+           { \
+             serr("WORKER %p execute too long %"PRIu32"\n", \
+                   worker, elapsed); \
+           } \
+       } \
+     while (0)
+#else
+#  define CALL_WORKER(worker, arg) worker(arg)
+#endif
+
+/****************************************************************************
  * Public Data
  ****************************************************************************/
 
@@ -65,45 +90,84 @@ struct lp_wqueue_s g_lpwork;
  * Name: work_thread
  *
  * Description:
- *   These are the worker threads that performs the actions placed on the
+ *   These are the worker threads that perform the actions placed on the
  *   high priority work queue.
  *
  *   These, along with the lower priority worker thread(s) are the kernel
- *   mode work queues (also build in the flat build).
+ *   mode work queues (also built in the flat build).
  *
  *   All kernel mode worker threads are started by the OS during normal
  *   bring up.  This entry point is referenced by OS internally and should
  *   not be accessed by application logic.
  *
  * Input Parameters:
- *   argc, argv (not used)
+ *   argc, argv
  *
  * Returned Value:
  *   Does not return
  *
  ****************************************************************************/
 
-static int work_thread(int argc, char *argv[])
+static int work_thread(int argc, FAR char *argv[])
 {
-  FAR struct kwork_wqueue_s *queue;
-  int wndx;
+  FAR struct kwork_wqueue_s *wqueue;
+  FAR struct work_s *work;
+  worker_t  worker;
+  irqstate_t flags;
+  FAR void *arg;
 
-  queue  = (FAR struct kwork_wqueue_s *)
+  wqueue = (FAR struct kwork_wqueue_s *)
            ((uintptr_t)strtoul(argv[1], NULL, 0));
-  wndx   = atoi(argv[2]);
+
+  flags = enter_critical_section();
 
   /* Loop forever */
 
   for (; ; )
     {
       /* Then process queued work.  work_process will not return until: (1)
-       * there is no further work in the work queue, and (2) signal is
-       * triggered, or delayed work expires.
+       * there is no further work in the work queue, and (2) semaphore is
+       * posted.
+       */
+
+      nxsem_wait_uninterruptible(&wqueue->sem);
+
+      /* And check each entry in the work queue.  Since we have disabled
+       * interrupts we know:  (1) we will not be suspended unless we do
+       * so ourselves, and (2) there will be no changes to the work queue
        */
 
-      work_process(queue, wndx);
+      /* Remove the ready-to-execute work from the list */
+
+      work = (FAR struct work_s *)sq_remfirst(&wqueue->q);
+      if (work && work->worker)
+        {
+          /* Extract the work description from the entry (in case the work
+           * instance will be re-used after it has been de-queued).
+           */
+
+          worker = work->worker;
+
+          /* Extract the work argument (before re-enabling interrupts) */
+
+          arg = work->arg;
+
+          /* Mark the work as no longer being queued */
+
+          work->worker = NULL;
+
+          /* Do the work.  Re-enable interrupts while the work is being
+           * performed... we don't have any idea how long this will take!
+           */
+
+          leave_critical_section(flags);
+          CALL_WORKER(worker, arg);
+          flags = enter_critical_section();
+        }
     }
 
+  leave_critical_section(flags);
+
   return OK; /* To keep some compilers happy */
 }
 
@@ -130,14 +194,17 @@ static int work_thread_create(FAR const char *name, int priority,
                               int stack_size, int nthread,
                               FAR struct kwork_wqueue_s *wqueue)
 {
-  FAR char *argv[3];
-  char args[2][16];
+  FAR char *argv[2];
+  char args[16];
   int wndx;
   int pid;
 
-  snprintf(args[0], 16, "0x%" PRIxPTR, (uintptr_t)wqueue);
-  argv[0] = args[0];
-  argv[2] = NULL;
+  snprintf(args, 16, "0x%" PRIxPTR, (uintptr_t)wqueue);
+  argv[0] = args;
+  argv[1] = NULL;
+
+  nxsem_init(&wqueue->sem, 0, 0);
+  nxsem_set_protocol(&wqueue->sem, SEM_PRIO_NONE);
 
   /* Don't permit any of the threads to run until we have fully initialized
    * g_hpwork and g_lpwork.
@@ -147,9 +214,6 @@ static int work_thread_create(FAR const char *name, int priority,
 
   for (wndx = 0; wndx < nthread; wndx++)
     {
-      snprintf(args[1], 16, "%d", wndx);
-      argv[1] = args[1];
-
       pid = kthread_create(name, priority, stack_size,
                            (main_t)work_thread, argv);
 
@@ -164,7 +228,6 @@ static int work_thread_create(FAR const char *name, int priority,
 #ifdef CONFIG_PRIORITY_INHERITANCE
       wqueue->worker[wndx].pid  = pid;
 #endif
-      wqueue->worker[wndx].busy = true;
     }
 
   sched_unlock();
diff --git a/sched/wqueue/wqueue.h b/sched/wqueue/wqueue.h
index 106cb2d..774cebc 100644
--- a/sched/wqueue/wqueue.h
+++ b/sched/wqueue/wqueue.h
@@ -51,15 +51,17 @@
 
 struct kworker_s
 {
-  pid_t             pid;    /* The task ID of the worker thread */
-  volatile bool     busy;   /* True: Worker is not available */
+#ifdef CONFIG_PRIORITY_INHERITANCE
+  pid_t             pid;       /* The task ID of the worker thread */
+#endif
 };
 
 /* This structure defines the state of one kernel-mode work queue */
 
 struct kwork_wqueue_s
 {
-  struct dq_queue_s q;         /* The queue of pending work */
+  struct sq_queue_s q;         /* The queue of pending work */
+  sem_t             sem;       /* The counting semaphore of the wqueue */
   struct kworker_s  worker[1]; /* Describes a worker thread */
 };
 
@@ -70,7 +72,8 @@ struct kwork_wqueue_s
 #ifdef CONFIG_SCHED_HPWORK
 struct hp_wqueue_s
 {
-  struct dq_queue_s q;         /* The queue of pending work */
+  struct sq_queue_s q;         /* The queue of pending work */
+  sem_t             sem;       /* The counting semaphore of the wqueue */
 
   /* Describes each thread in the high priority queue's thread pool */
 
@@ -85,7 +88,8 @@ struct hp_wqueue_s
 #ifdef CONFIG_SCHED_LPWORK
 struct lp_wqueue_s
 {
-  struct dq_queue_s q;      /* The queue of pending work */
+  struct sq_queue_s q;         /* The queue of pending work */
+  sem_t             sem;       /* The counting semaphore of the wqueue */
 
   /* Describes each thread in the low priority queue's thread pool */
 
@@ -152,26 +156,6 @@ int work_start_lowpri(void);
 #endif
 
 /****************************************************************************
- * Name: work_process
- *
- * Description:
- *   This is the logic that performs actions placed on any work list.  This
- *   logic is the common underlying logic to all work queues.  This logic is
- *   part of the internal implementation of each work queue; it should not
- *   be called from application level logic.
- *
- * Input Parameters:
- *   wqueue - Describes the work queue to be processed
- *   wndx   - The worker thread index
- *
- * Returned Value:
- *   None
- *
- ****************************************************************************/
-
-void work_process(FAR struct kwork_wqueue_s *wqueue, int wndx);
-
-/****************************************************************************
  * Name: work_initialize_notifier
  *
  * Description: