You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nuttx.apache.org by xi...@apache.org on 2021/07/28 04:01:48 UTC

[incubator-nuttx] 04/04: usrwqueue: implement order work queue

This is an automated email from the ASF dual-hosted git repository.

xiaoxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nuttx.git

commit 23d87ff9df116f0f43465a13adc2fbe720fc3466
Author: Jiuzhu Dong <do...@xiaomi.com>
AuthorDate: Mon Jul 26 10:36:13 2021 +0800

    usrwqueue: implement order work queue
    
    Signed-off-by: Jiuzhu Dong <do...@xiaomi.com>
---
 include/nuttx/semaphore.h         |   2 +-
 libs/libc/wqueue/Make.defs        |   2 +-
 libs/libc/wqueue/work_cancel.c    |  49 +++++++++---
 libs/libc/wqueue/work_queue.c     |  80 ++++++++++++++-----
 libs/libc/wqueue/work_signal.c    |  99 -----------------------
 libs/libc/wqueue/work_usrthread.c | 164 +++++++++++---------------------------
 libs/libc/wqueue/wqueue.h         |   4 +-
 7 files changed, 149 insertions(+), 251 deletions(-)

diff --git a/include/nuttx/semaphore.h b/include/nuttx/semaphore.h
index 42bd308..83fb3c1 100644
--- a/include/nuttx/semaphore.h
+++ b/include/nuttx/semaphore.h
@@ -61,7 +61,7 @@
 #  define _SEM_TIMEDWAIT(s,t)   nxsem_timedwait(s,t)
 #  define _SEM_CLOCKWAIT(s,c,t) nxsem_clockwait(s,c,t)
 #  define _SEM_POST(s)          nxsem_post(s)
-#  define _SEM_GETVALUE(s)      nxsem_get_value(s)
+#  define _SEM_GETVALUE(s,v)    nxsem_get_value(s,v)
 #  define _SEM_GETPROTOCOL(s,p) nxsem_get_protocol(s,p)
 #  define _SEM_SETPROTOCOL(s,p) nxsem_set_protocol(s,p)
 #  define _SEM_ERRNO(r)         (-(r))
diff --git a/libs/libc/wqueue/Make.defs b/libs/libc/wqueue/Make.defs
index 72862bf..a0633d1 100644
--- a/libs/libc/wqueue/Make.defs
+++ b/libs/libc/wqueue/Make.defs
@@ -22,7 +22,7 @@ ifeq ($(CONFIG_LIB_USRWORK),y)
 
 # Add the work queue C files to the build
 
-CSRCS += work_usrthread.c work_queue.c work_cancel.c work_signal.c
+CSRCS += work_usrthread.c work_queue.c work_cancel.c
 
 # Add the wqueue directory to the build
 
diff --git a/libs/libc/wqueue/work_cancel.c b/libs/libc/wqueue/work_cancel.c
index 54e084c..77a4c34 100644
--- a/libs/libc/wqueue/work_cancel.c
+++ b/libs/libc/wqueue/work_cancel.c
@@ -48,8 +48,8 @@
  *   work_queue() again.
  *
  * Input Parameters:
- *   qid    - The work queue ID
- *   work   - The previously queued work structure to cancel
+ *   wqueue - The work queue
+ *   work   - The previously queue work structure to cancel
  *
  * Returned Value:
  *   Zero (OK) on success, a negated errno on failure.  This error may be
@@ -63,7 +63,10 @@
 static int work_qcancel(FAR struct usr_wqueue_s *wqueue,
                         FAR struct work_s *work)
 {
+  FAR sq_entry_t *prev = NULL;
+  FAR sq_entry_t *curr;
   int ret = -ENOENT;
+  int semcount;
 
   DEBUGASSERT(work != NULL);
 
@@ -78,18 +81,44 @@ static int work_qcancel(FAR struct usr_wqueue_s *wqueue,
 
   if (work->worker != NULL)
     {
-      /* A little test of the integrity of the work queue */
+      /* Search the work activelist for the target work. We can't
+       * use sq_rem to do this because there are additional operations that
+       * need to be done.
+       */
 
-      DEBUGASSERT(work->dq.flink != NULL ||
-                  (FAR dq_entry_t *)work == wqueue->q.tail);
-      DEBUGASSERT(work->dq.blink != NULL ||
-                  (FAR dq_entry_t *)work == wqueue->q.head);
+      curr = wqueue->q.head;
+      while (curr && curr != &work->u.s.sq)
+        {
+          prev = curr;
+          curr = curr->flink;
+        }
 
-      /* Remove the entry from the work queue and make sure that it is
-       * marked as available (i.e., the worker field is nullified).
+      /* Check if the work was found in the list.  If not, then an OS
+       * error has occurred because the work is marked active!
        */
 
-      dq_rem((FAR dq_entry_t *)work, &wqueue->q);
+      DEBUGASSERT(curr);
+
+      /* Now, remove the work from the work queue */
+
+      if (prev)
+        {
+          /* Remove the work from mid- or end-of-queue */
+
+          sq_remafter(prev, &wqueue->q);
+        }
+      else
+        {
+          /* Remove the work at the head of the queue */
+
+          sq_remfirst(&wqueue->q);
+          _SEM_GETVALUE(&wqueue->wake, &semcount);
+          if (semcount < 1)
+            {
+              _SEM_POST(&wqueue->wake);
+            }
+        }
+
       work->worker = NULL;
       ret = OK;
     }
diff --git a/libs/libc/wqueue/work_queue.c b/libs/libc/wqueue/work_queue.c
index 5f631f2..7470b4a 100644
--- a/libs/libc/wqueue/work_queue.c
+++ b/libs/libc/wqueue/work_queue.c
@@ -32,6 +32,7 @@
 
 #include <nuttx/clock.h>
 #include <nuttx/wqueue.h>
+#include <nuttx/semaphore.h>
 
 #include "wqueue/wqueue.h"
 
@@ -56,7 +57,7 @@
  *   and remove it from the work queue.
  *
  * Input Parameters:
- *   qid    - The work queue ID (index)
+ *   wqueue - The work queue
  *   work   - The work structure to queue
  *   worker - The worker callback to be invoked.  The callback will be
  *            invoked on the worker thread of execution.
@@ -74,35 +75,72 @@ static int work_qqueue(FAR struct usr_wqueue_s *wqueue,
                        FAR struct work_s *work, worker_t worker,
                        FAR void *arg, clock_t delay)
 {
-  DEBUGASSERT(work != NULL);
+  FAR sq_entry_t *prev = NULL;
+  FAR sq_entry_t *curr;
+  sclock_t delta;
+  int semcount;
 
   /* Get exclusive access to the work queue */
 
   while (_SEM_WAIT(&wqueue->lock) < 0);
 
-  /* Is there already pending work? */
+  /* Initialize the work structure */
 
-  if (work->worker != NULL)
-    {
-      /* Remove the entry from the work queue.  It will be requeued at the
-       * end of the work queue.
-       */
+  work->worker = worker;             /* Work callback. non-NULL means queued */
+  work->arg    = arg;                /* Callback argument */
+  work->u.s.qtime = clock() + delay; /* Delay until work performed */
 
-      dq_rem((FAR dq_entry_t *)work, &wqueue->q);
-    }
+  /* Do the easy case first -- when the work queue is empty. */
 
-  /* Initialize the work structure */
-
-  work->worker = worker;           /* Work callback. non-NULL means queued */
-  work->arg    = arg;              /* Callback argument */
-  work->delay  = delay;            /* Delay until work performed */
+  if (wqueue->q.head == NULL)
+    {
+      /* Add the watchdog to the head == tail of the queue. */
 
-  /* Now, time-tag that entry and put it in the work queue. */
+      sq_addfirst(&work->u.s.sq, &wqueue->q);
+      _SEM_POST(&wqueue->wake);
+    }
 
-  work->qtime  = clock(); /* Time work queued */
+  /* There are other active watchdogs in the timer queue */
 
-  dq_addlast((FAR dq_entry_t *)work, &wqueue->q);
-  kill(wqueue->pid, SIGWORK);   /* Wake up the worker thread */
+  else
+    {
+      curr = wqueue->q.head;
+
+      /* Check if the new work must be inserted before the curr. */
+
+      do
+        {
+          delta = work->u.s.qtime - ((FAR struct work_s *)curr)->u.s.qtime;
+          if (delta < 0)
+            {
+              break;
+            }
+
+          prev = curr;
+          curr = curr->flink;
+        }
+      while (curr != NULL);
+
+      /* Insert the new watchdog in the list */
+
+      if (prev == NULL)
+        {
+          /* Insert the watchdog at the head of the list */
+
+          sq_addfirst(&work->u.s.sq, &wqueue->q);
+          _SEM_GETVALUE(&wqueue->wake, &semcount);
+          if (semcount < 1)
+            {
+              _SEM_POST(&wqueue->wake);
+            }
+        }
+      else
+        {
+          /* Insert the watchdog in mid- or end-of-queue */
+
+          sq_addafter(prev, &work->u.s.sq, &wqueue->q);
+        }
+    }
 
   _SEM_POST(&wqueue->lock);
   return OK;
@@ -146,6 +184,10 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
 {
   if (qid == USRWORK)
     {
+      /* Is there already pending work? */
+
+      work_cancel(qid, work);
+
       return work_qqueue(&g_usrwork, work, worker, arg, delay);
     }
   else
diff --git a/libs/libc/wqueue/work_signal.c b/libs/libc/wqueue/work_signal.c
deleted file mode 100644
index eae548b..0000000
--- a/libs/libc/wqueue/work_signal.c
+++ /dev/null
@@ -1,99 +0,0 @@
-/****************************************************************************
- * libs/libc/wqueue/work_signal.c
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The
- * ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
- * License for the specific language governing permissions and limitations
- * under the License.
- *
- ****************************************************************************/
-
-/****************************************************************************
- * Included Files
- ****************************************************************************/
-
-#include <nuttx/config.h>
-
-#include <signal.h>
-#include <errno.h>
-
-#include <nuttx/wqueue.h>
-
-#include "wqueue/wqueue.h"
-
-#if defined(CONFIG_LIB_USRWORK) && !defined(__KERNEL__)
-
-/****************************************************************************
- * Pre-processor Definitions
- ****************************************************************************/
-
-/****************************************************************************
- * Private Type Declarations
- ****************************************************************************/
-
-/****************************************************************************
- * Public Data
- ****************************************************************************/
-
-/****************************************************************************
- * Private Data
- ****************************************************************************/
-
-/****************************************************************************
- * Private Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Public Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Name: work_signal
- *
- * Description:
- *   Signal the worker thread to process the work queue now.  This function
- *   is used internally by the work logic but could also be used by the
- *   user to force an immediate re-assessment of pending work.
- *
- * Input Parameters:
- *   qid    - The work queue ID
- *
- * Returned Value:
- *   Zero on success, a negated errno on failure
- *
- ****************************************************************************/
-
-int work_signal(int qid)
-{
-  int ret;
-
-  if (qid == USRWORK)
-    {
-      /* Signal the worker thread */
-
-      ret = kill(g_usrwork.pid, SIGWORK);
-      if (ret < 0)
-        {
-          int errcode = get_errno();
-          ret = -errcode;
-        }
-    }
-  else
-    {
-      ret = -EINVAL;
-    }
-
-  return ret;
-}
-
-#endif /* CONFIG_LIB_USRWORK && !__KERNEL__ */
diff --git a/libs/libc/wqueue/work_usrthread.c b/libs/libc/wqueue/work_usrthread.c
index 0c71b8a..8c2b345 100644
--- a/libs/libc/wqueue/work_usrthread.c
+++ b/libs/libc/wqueue/work_usrthread.c
@@ -27,7 +27,6 @@
 #include <stdint.h>
 #include <unistd.h>
 #include <pthread.h>
-#include <signal.h>
 #include <sched.h>
 #include <errno.h>
 #include <assert.h>
@@ -45,26 +44,12 @@
  * Pre-processor Definitions
  ****************************************************************************/
 
-/* Use CLOCK_MONOTONIC if it is available.  CLOCK_REALTIME can cause bad
- * delays if the time is changed.
- */
-
-#ifdef CONFIG_CLOCK_MONOTONIC
-#  define WORK_CLOCK CLOCK_MONOTONIC
-#else
-#  define WORK_CLOCK CLOCK_REALTIME
-#endif
-
 #ifdef CONFIG_SYSTEM_TIME64
 #  define WORK_DELAY_MAX UINT64_MAX
 #else
 #  define WORK_DELAY_MAX UINT32_MAX
 #endif
 
-#ifndef MIN
-#  define MIN(a,b) ((a) < (b) ? (a) : (b))
-#endif
-
 /****************************************************************************
  * Private Type Declarations
  ****************************************************************************/
@@ -98,17 +83,12 @@ struct usr_wqueue_s g_usrwork;
  *
  ****************************************************************************/
 
-void work_process(FAR struct usr_wqueue_s *wqueue)
+static void work_process(FAR struct usr_wqueue_s *wqueue)
 {
   volatile FAR struct work_s *work;
-  sigset_t sigset;
-  sigset_t oldset;
-  worker_t  worker;
+  worker_t worker;
   FAR void *arg;
-  clock_t elapsed;
-  clock_t remaining;
-  clock_t stick;
-  clock_t ctick;
+  sclock_t elapsed;
   clock_t next;
   int ret;
 
@@ -125,15 +105,6 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
       return;
     }
 
-  /* Set up the signal mask */
-
-  sigemptyset(&sigset);
-  sigaddset(&sigset, SIGWORK);
-
-  /* Get the time that we started this polling cycle in clock ticks. */
-
-  stick = clock();
-
   /* And check each entry in the work queue.  Since we have locked the
    * work queue we know:  (1) we will not be suspended unless we do
    * so ourselves, and (2) there will be no changes to the work queue
@@ -142,19 +113,21 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
   work = (FAR struct work_s *)wqueue->q.head;
   while (work)
     {
-      /* Is this work ready?  It is ready if there is no delay or if
-       * the delay has elapsed. qtime is the time that the work was added
-       * to the work queue.  It will always be greater than or equal to
-       * zero.  Therefore a delay of zero will always execute immediately.
+      /* Is this work ready? It is ready if there is no delay or if
+       * the delay has elapsed.  is the time that the work was added
+       * to the work queue. Therefore a delay of equal or less than
+       * zero will always execute immediately.
        */
 
-      ctick   = clock();
-      elapsed = ctick - work->qtime;
-      if (elapsed >= work->delay)
+      elapsed = clock() - work->u.s.qtime;
+
+      /* Is this delay work ready? */
+
+      if (elapsed >= 0)
         {
           /* Remove the ready-to-execute work from the list */
 
-          dq_rem((struct dq_entry_s *)work, &wqueue->q);
+          sq_remfirst(&wqueue->q);
 
           /* Extract the work description from the entry (in case the work
            * instance by the re-used after it has been de-queued).
@@ -195,65 +168,26 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
 
                   return;
                 }
-
-              work = (FAR struct work_s *)wqueue->q.head;
             }
-          else
-            {
-              /* Canceled.. Just move to the next work in the list with
-               * the work queue still locked.
-               */
 
-              work = (FAR struct work_s *)work->dq.flink;
-            }
+          work = (FAR struct work_s *)wqueue->q.head;
         }
-      else /* elapsed < work->delay */
+      else
         {
-          /* This one is not ready.
-           *
-           * NOTE that elapsed is relative to the current time,
-           * not the time of beginning of this queue processing pass.
-           * So it may need an adjustment.
-           */
-
-          elapsed += (ctick - stick);
-          if (elapsed > work->delay)
-            {
-              /* The delay has expired while we are processing */
-
-              elapsed = work->delay;
-            }
-
-          /* Will it be ready before the next scheduled wakeup interval? */
-
-          remaining = work->delay - elapsed;
-          if (remaining < next)
-            {
-              /* Yes.. Then schedule to wake up when the work is ready */
-
-              next = remaining;
-            }
-
-          /* Then try the next in the list. */
-
-          work = (FAR struct work_s *)work->dq.flink;
+          next = work->u.s.qtime - clock();
+          break;
         }
     }
 
-  /* Unlock the work queue before waiting.  In order to assure that we do
-   * not lose the SIGWORK signal before waiting, we block the SIGWORK
-   * signals before unlocking the work queue.  That will cause in SIGWORK
-   * signals directed to the worker thread to pend.
-   */
+  /* Unlock the work queue before waiting. */
 
-  sigprocmask(SIG_BLOCK, &sigset, &oldset);
   _SEM_POST(&wqueue->lock);
 
   if (next == WORK_DELAY_MAX)
     {
-      /* Wait indefinitely until signaled with SIGWORK */
+      /* Wait indefinitely until work_queue has new items */
 
-      sigwaitinfo(&sigset, NULL);
+      _SEM_WAIT(&wqueue->wake);
     }
   else
     {
@@ -261,7 +195,7 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
       time_t sec;
 
       /* Wait awhile to check the work list.  We will wait here until
-       * either the time elapses or until we are awakened by a signal.
+       * either the time elapses or until we are awakened by a semaphore.
        * Interrupts will be re-enabled while we wait.
        */
 
@@ -269,10 +203,8 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
       rqtp.tv_sec  = sec;
       rqtp.tv_nsec = (next - (sec * 1000000)) * 1000;
 
-      sigtimedwait(&sigset, NULL, &rqtp);
+      _SEM_TIMEDWAIT(&wqueue->wake, &rqtp);
     }
-
-  sigprocmask(SIG_SETMASK, &oldset, NULL);
 }
 
 /****************************************************************************
@@ -339,53 +271,48 @@ static pthread_addr_t work_usrthread(pthread_addr_t arg)
 
 int work_usrstart(void)
 {
+  int ret;
+#ifndef CONFIG_BUILD_PROTECTED
+  pthread_t usrwork;
+  pthread_attr_t attr;
+  struct sched_param param;
+#endif
+
   /* Set up the work queue lock */
 
   _SEM_INIT(&g_usrwork.lock, 0, 1);
 
+  _SEM_INIT(&g_usrwork.wake, 0, 0);
+  _SEM_SETPROTOCOL(&g_usrwork.wake, SEM_PRIO_NONE);
+
+  /* Initialize the work queue */
+
+  sq_init(&g_usrwork.q);
+
 #ifdef CONFIG_BUILD_PROTECTED
 
   /* Start a user-mode worker thread for use by applications. */
 
-  g_usrwork.pid = task_create("uwork",
-                              CONFIG_LIB_USRWORKPRIORITY,
-                              CONFIG_LIB_USRWORKSTACKSIZE,
-                              (main_t)work_usrthread,
-                              (FAR char * const *)NULL);
-
-  DEBUGASSERT(g_usrwork.pid > 0);
-  if (g_usrwork.pid < 0)
+  ret = task_create("uwork",
+                    CONFIG_LIB_USRWORKPRIORITY,
+                    CONFIG_LIB_USRWORKSTACKSIZE,
+                    (main_t)work_usrthread,
+                    ((FAR char * const *)NULL));
+  if (ret < 0)
     {
       int errcode = get_errno();
       DEBUGASSERT(errcode > 0);
       return -errcode;
     }
 
-  return g_usrwork.pid;
+  return ret;
 #else
-  pthread_t usrwork;
-  pthread_attr_t attr;
-  struct sched_param param;
-  int ret;
-
   /* Start a user-mode worker thread for use by applications. */
 
   pthread_attr_init(&attr);
   pthread_attr_setstacksize(&attr, CONFIG_LIB_USRWORKSTACKSIZE);
 
-#ifdef CONFIG_SCHED_SPORADIC
-  /* Get the current sporadic scheduling parameters.  Those will not be
-   * modified.
-   */
-
-  ret = set_getparam(pid, &param);
-  if (ret < 0)
-    {
-      int errcode = get_errno();
-      return -errcode;
-    }
-#endif
-
+  pthread_attr_getschedparam(&attr, &param);
   param.sched_priority = CONFIG_LIB_USRWORKPRIORITY;
   pthread_attr_setschedparam(&attr, &param);
 
@@ -401,8 +328,7 @@ int work_usrstart(void)
 
   pthread_detach(usrwork);
 
-  g_usrwork.pid = (pid_t)usrwork;
-  return g_usrwork.pid;
+  return (pid_t)usrwork;
 #endif
 }
 
diff --git a/libs/libc/wqueue/wqueue.h b/libs/libc/wqueue/wqueue.h
index e84c229..4c03421 100644
--- a/libs/libc/wqueue/wqueue.h
+++ b/libs/libc/wqueue/wqueue.h
@@ -46,9 +46,9 @@
 
 struct usr_wqueue_s
 {
-  struct dq_queue_s q;      /* The queue of pending work */
+  struct sq_queue_s q;      /* The queue of pending work */
   sem_t             lock;   /* exclusive access to user-mode work queue */
-  pid_t             pid;    /* The task ID of the worker thread(s) */
+  sem_t             wake;   /* The wake-up semaphore of the  usrthread */
 };
 
 /****************************************************************************