You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nuttx.apache.org by xi...@apache.org on 2021/07/28 04:01:46 UTC
[incubator-nuttx] 02/04: work_queue: schedule the work queue using
the timer mechanism
This is an automated email from the ASF dual-hosted git repository.
xiaoxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nuttx.git
commit 855c78bb9dc48d4f2655257fb7ee01ff9c456cbc
Author: Jiuzhu Dong <do...@xiaomi.com>
AuthorDate: Sat Jun 19 17:29:30 2021 +0800
work_queue: schedule the work queue using the timer mechanism
Signed-off-by: Jiuzhu Dong <do...@xiaomi.com>
---
include/nuttx/wqueue.h | 35 ++----
sched/wqueue/Make.defs | 3 +-
sched/wqueue/kwork_cancel.c | 17 +--
sched/wqueue/kwork_process.c | 271 -------------------------------------------
sched/wqueue/kwork_queue.c | 130 ++++++++++-----------
sched/wqueue/kwork_signal.c | 111 ------------------
sched/wqueue/kwork_thread.c | 103 ++++++++++++----
sched/wqueue/wqueue.h | 34 ++----
8 files changed, 174 insertions(+), 530 deletions(-)
diff --git a/include/nuttx/wqueue.h b/include/nuttx/wqueue.h
index e78c46c..5a2d221 100644
--- a/include/nuttx/wqueue.h
+++ b/include/nuttx/wqueue.h
@@ -32,6 +32,7 @@
#include <queue.h>
#include <nuttx/clock.h>
+#include <nuttx/wdog.h>
/****************************************************************************
* Pre-processor Definitions
@@ -244,11 +245,17 @@ typedef CODE void (*worker_t)(FAR void *arg);
struct work_s
{
- struct dq_entry_s dq; /* Implements a doubly linked list */
- worker_t worker; /* Work callback */
- FAR void *arg; /* Callback argument */
- clock_t qtime; /* Time work queued */
- clock_t delay; /* Delay until work performed */
+ union
+ {
+ struct
+ {
+ struct sq_entry_s sq; /* Implements a single linked list */
+ clock_t qtime; /* Time work queued */
+ } s;
+ struct wdog_s timer; /* Delay expiry timer */
+ } u;
+ worker_t worker; /* Work callback */
+ FAR void *arg; /* Callback argument */
};
/* This is an enumeration of the various events that may be
@@ -375,24 +382,6 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
int work_cancel(int qid, FAR struct work_s *work);
/****************************************************************************
- * Name: work_signal
- *
- * Description:
- * Signal the worker thread to process the work queue now. This function
- * is used internally by the work logic but could also be used by the
- * user to force an immediate re-assessment of pending work.
- *
- * Input Parameters:
- * qid - The work queue ID
- *
- * Returned Value:
- * Zero on success, a negated errno on failure
- *
- ****************************************************************************/
-
-int work_signal(int qid);
-
-/****************************************************************************
* Name: work_available
*
* Description:
diff --git a/sched/wqueue/Make.defs b/sched/wqueue/Make.defs
index bc31d53..52fc34b 100644
--- a/sched/wqueue/Make.defs
+++ b/sched/wqueue/Make.defs
@@ -22,8 +22,7 @@
ifeq ($(CONFIG_SCHED_WORKQUEUE),y)
-CSRCS += kwork_queue.c kwork_process.c kwork_cancel.c kwork_signal.c
-CSRCS += kwork_thread.c
+CSRCS += kwork_queue.c kwork_thread.c kwork_cancel.c
ifeq ($(CONFIG_PRIORITY_INHERITANCE),y)
CSRCS += kwork_inherit.c
diff --git a/sched/wqueue/kwork_cancel.c b/sched/wqueue/kwork_cancel.c
index 69886b9..b5936b5 100644
--- a/sched/wqueue/kwork_cancel.c
+++ b/sched/wqueue/kwork_cancel.c
@@ -77,18 +77,19 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue,
flags = enter_critical_section();
if (work->worker != NULL)
{
- /* A little test of the integrity of the work queue */
-
- DEBUGASSERT(work->dq.flink != NULL ||
- (FAR dq_entry_t *)work == wqueue->q.tail);
- DEBUGASSERT(work->dq.blink != NULL ||
- (FAR dq_entry_t *)work == wqueue->q.head);
-
/* Remove the entry from the work queue and make sure that it is
* marked as available (i.e., the worker field is nullified).
*/
- dq_rem((FAR dq_entry_t *)work, &wqueue->q);
+ if (WDOG_ISACTIVE(&work->u.timer))
+ {
+ wd_cancel(&work->u.timer);
+ }
+ else
+ {
+ sq_rem((FAR sq_entry_t *)work, &wqueue->q);
+ }
+
work->worker = NULL;
ret = OK;
}
diff --git a/sched/wqueue/kwork_process.c b/sched/wqueue/kwork_process.c
deleted file mode 100644
index 9bec97e..0000000
--- a/sched/wqueue/kwork_process.c
+++ /dev/null
@@ -1,271 +0,0 @@
-/****************************************************************************
- * sched/wqueue/kwork_process.c
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The
- * ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- *
- ****************************************************************************/
-
-/****************************************************************************
- * Included Files
- ****************************************************************************/
-
-#include <nuttx/config.h>
-
-#include <debug.h>
-#include <stdint.h>
-#include <unistd.h>
-#include <signal.h>
-#include <assert.h>
-#include <queue.h>
-
-#include <nuttx/irq.h>
-#include <nuttx/clock.h>
-#include <nuttx/signal.h>
-#include <nuttx/wqueue.h>
-
-#include "wqueue/wqueue.h"
-
-#ifdef CONFIG_SCHED_WORKQUEUE
-
-/****************************************************************************
- * Pre-processor Definitions
- ****************************************************************************/
-
-/* Use CLOCK_MONOTONIC if it is available. CLOCK_REALTIME can cause bad
- * delays if the time is changed.
- */
-
-#ifdef CONFIG_CLOCK_MONOTONIC
-# define WORK_CLOCK CLOCK_MONOTONIC
-#else
-# define WORK_CLOCK CLOCK_REALTIME
-#endif
-
-#ifdef CONFIG_SYSTEM_TIME64
-# define WORK_DELAY_MAX UINT64_MAX
-#else
-# define WORK_DELAY_MAX UINT32_MAX
-#endif
-
-#ifndef MIN
-# define MIN(a,b) ((a) < (b) ? (a) : (b))
-#endif
-
-#ifndef CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE
-# define CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE 0
-#endif
-
-#if CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE > 0
-# define CALL_WORKER(worker, arg) \
- do \
- { \
- uint32_t start; \
- uint32_t elapsed; \
- start = up_critmon_gettime(); \
- worker(arg); \
- elapsed = up_critmon_gettime() - start; \
- if (elapsed > CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE) \
- { \
- serr("WORKER %p execute too long %"PRIu32"\n", \
- worker, elapsed); \
- } \
- } \
- while (0)
-#else
-# define CALL_WORKER(worker, arg) worker(arg)
-#endif
-
-/****************************************************************************
- * Public Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Name: work_process
- *
- * Description:
- * This is the logic that performs actions placed on any work list. This
- * logic is the common underlying logic to all work queues. This logic is
- * part of the internal implementation of each work queue; it should not
- * be called from application level logic.
- *
- * Input Parameters:
- * wqueue - Describes the work queue to be processed
- *
- * Returned Value:
- * None
- *
- ****************************************************************************/
-
-void work_process(FAR struct kwork_wqueue_s *wqueue, int wndx)
-{
- volatile FAR struct work_s *work;
- worker_t worker;
- irqstate_t flags;
- FAR void *arg;
- clock_t elapsed;
- clock_t remaining;
- clock_t stick;
- clock_t ctick;
- clock_t next;
-
- /* Then process queued work. We need to keep interrupts disabled while
- * we process items in the work list.
- */
-
- next = WORK_DELAY_MAX;
- flags = enter_critical_section();
-
- /* Get the time that we started processing the queue in clock ticks. */
-
- stick = clock_systime_ticks();
-
- /* And check each entry in the work queue. Since we have disabled
- * interrupts we know: (1) we will not be suspended unless we do
- * so ourselves, and (2) there will be no changes to the work queue
- */
-
- work = (FAR struct work_s *)wqueue->q.head;
- while (work != NULL)
- {
- /* Is this work ready? It is ready if there is no delay or if
- * the delay has elapsed. qtime is the time that the work was added
- * to the work queue. It will always be greater than or equal to
- * zero. Therefore a delay of zero will always execute immediately.
- */
-
- ctick = clock_systime_ticks();
- elapsed = ctick - work->qtime;
- if (elapsed >= work->delay)
- {
- /* Remove the ready-to-execute work from the list */
-
- dq_rem((struct dq_entry_s *)work, &wqueue->q);
-
- /* Extract the work description from the entry (in case the work
- * instance by the re-used after it has been de-queued).
- */
-
- worker = work->worker;
-
- /* Check for a race condition where the work may be nullified
- * before it is removed from the queue.
- */
-
- if (worker != NULL)
- {
- /* Extract the work argument (before re-enabling interrupts) */
-
- arg = work->arg;
-
- /* Mark the work as no longer being queued */
-
- work->worker = NULL;
-
- /* Do the work. Re-enable interrupts while the work is being
- * performed... we don't have any idea how long this will take!
- */
-
- leave_critical_section(flags);
- CALL_WORKER(worker, arg);
-
- /* Now, unfortunately, since we re-enabled interrupts we don't
- * know the state of the work list and we will have to start
- * back at the head of the list.
- */
-
- flags = enter_critical_section();
- work = (FAR struct work_s *)wqueue->q.head;
- }
- else
- {
- /* Cancelled.. Just move to the next work in the list with
- * interrupts still disabled.
- */
-
- work = (FAR struct work_s *)work->dq.flink;
- }
- }
- else /* elapsed < work->delay */
- {
- /* This one is not ready.
- *
- * NOTE that elapsed is relative to the current time,
- * not the time of beginning of this queue processing pass.
- * So it may need an adjustment.
- */
-
- elapsed += (ctick - stick);
- if (elapsed > work->delay)
- {
- /* The delay has expired while we are processing */
-
- elapsed = work->delay;
- }
-
- /* Will it be ready before the next scheduled wakeup interval? */
-
- remaining = work->delay - elapsed;
- if (remaining < next)
- {
- /* Yes.. Then schedule to wake up when the work is ready */
-
- next = remaining;
- }
-
- /* Then try the next in the list. */
-
- work = (FAR struct work_s *)work->dq.flink;
- }
- }
-
- /* When multiple worker threads are created for this work queue, only
- * thread 0 (wndx = 0) will monitor the unexpired works.
- *
- * Other worker threads (wndx > 0) just process no-delay or expired
- * works, then sleep. The unexpired works are left in the queue. They
- * will be handled by thread 0 when it finishes current work and iterate
- * over the queue again.
- */
-
- if (wndx > 0 || next == WORK_DELAY_MAX)
- {
- sigset_t set;
-
- /* Wait indefinitely until signalled with SIGWORK */
-
- sigemptyset(&set);
- nxsig_addset(&set, SIGWORK);
-
- wqueue->worker[wndx].busy = false;
- DEBUGVERIFY(nxsig_waitinfo(&set, NULL));
- wqueue->worker[wndx].busy = true;
- }
- else
- {
- /* Wait a while to check the work list. We will wait here until
- * either the time elapses or until we are awakened by a signal.
- * Interrupts will be re-enabled while we wait.
- */
-
- wqueue->worker[wndx].busy = false;
- nxsig_usleep(next * USEC_PER_TICK);
- wqueue->worker[wndx].busy = true;
- }
-
- leave_critical_section(flags);
-}
-
-#endif /* CONFIG_SCHED_WORKQUEUE */
diff --git a/sched/wqueue/kwork_queue.c b/sched/wqueue/kwork_queue.c
index 17fedb2..1b868f1 100644
--- a/sched/wqueue/kwork_queue.c
+++ b/sched/wqueue/kwork_queue.c
@@ -43,73 +43,32 @@
****************************************************************************/
/****************************************************************************
- * Name: work_qqueue
- *
- * Description:
- * Queue work to be performed at a later time. All queued work will be
- * performed on the worker thread of execution (not the caller's).
- *
- * The work structure is allocated by caller, but completely managed by
- * the work queue logic. The caller should never modify the contents of
- * the work queue structure; the caller should not call work_qqueue()
- * again until either (1) the previous work has been performed and removed
- * from the queue, or (2) work_cancel() has been called to cancel the work
- * and remove it from the work queue.
- *
- * Input Parameters:
- * qid - The work queue ID (index)
- * work - The work structure to queue
- * worker - The worker callback to be invoked. The callback will be
- * invoked on the worker thread of execution.
- * arg - The argument that will be passed to the worker callback when
- * int is invoked.
- * delay - Delay (in clock ticks) from the time queue until the worker
- * is invoked. Zero means to perform the work immediately.
- *
- * Returned Value:
- * None
- *
+ * Name: hp_work_timer_expiry
****************************************************************************/
-static void work_qqueue(FAR struct kwork_wqueue_s *wqueue,
- FAR struct work_s *work, worker_t worker,
- FAR void *arg, clock_t delay)
+#ifdef CONFIG_SCHED_HPWORK
+static void hp_work_timer_expiry(wdparm_t arg)
{
- irqstate_t flags;
-
- DEBUGASSERT(work != NULL && worker != NULL);
-
- /* Interrupts are disabled so that this logic can be called from with
- * task logic or ifrom nterrupt handling logic.
- */
-
- flags = enter_critical_section();
-
- /* Is there already pending work? */
-
- if (work->worker != NULL)
- {
- /* Remove the entry from the work queue. It will be requeued at the
- * end of the work queue.
- */
-
- dq_rem((FAR dq_entry_t *)work, &wqueue->q);
- }
-
- /* Initialize the work structure. */
-
- work->worker = worker; /* Work callback. non-NULL means queued */
- work->arg = arg; /* Callback argument */
- work->delay = delay; /* Delay until work performed */
-
- /* Now, time-tag that entry and put it in the work queue */
-
- work->qtime = clock_systime_ticks(); /* Time work queued */
+ irqstate_t flags = enter_critical_section();
+ sq_addlast((FAR sq_entry_t *)arg, &g_hpwork.q);
+ nxsem_post(&g_hpwork.sem);
+ leave_critical_section(flags);
+}
+#endif
- dq_addlast((FAR dq_entry_t *)work, &wqueue->q);
+/****************************************************************************
+ * Name: lp_work_timer_expiry
+ ****************************************************************************/
+#ifdef CONFIG_SCHED_LPWORK
+static void lp_work_timer_expiry(wdparm_t arg)
+{
+ irqstate_t flags = enter_critical_section();
+ sq_addlast((FAR sq_entry_t *)arg, &g_lpwork.q);
+ nxsem_post(&g_lpwork.sem);
leave_critical_section(flags);
}
+#endif
/****************************************************************************
* Public Functions
@@ -148,6 +107,23 @@ static void work_qqueue(FAR struct kwork_wqueue_s *wqueue,
int work_queue(int qid, FAR struct work_s *work, worker_t worker,
FAR void *arg, clock_t delay)
{
+ irqstate_t flags;
+
+ /* Remove the entry from the timer and work queue. */
+
+ work_cancel(qid, work);
+
+ /* Interrupts are disabled so that this logic can be called from with
+ * task logic or from interrupt handling logic.
+ */
+
+ flags = enter_critical_section();
+
+ /* Initialize the work structure. */
+
+ work->worker = worker; /* Work callback. non-NULL means queued */
+ work->arg = arg; /* Callback argument */
+
/* Queue the new work */
#ifdef CONFIG_SCHED_HPWORK
@@ -155,9 +131,16 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
{
/* Queue high priority work */
- work_qqueue((FAR struct kwork_wqueue_s *)&g_hpwork, work, worker,
- arg, delay);
- return work_signal(HPWORK);
+ if (!delay)
+ {
+ sq_addlast((FAR sq_entry_t *)work, &g_hpwork.q);
+ nxsem_post(&g_hpwork.sem);
+ }
+ else
+ {
+ wd_start(&work->u.timer, delay, hp_work_timer_expiry,
+ (wdparm_t)work);
+ }
}
else
#endif
@@ -166,15 +149,22 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
{
/* Queue low priority work */
- work_qqueue((FAR struct kwork_wqueue_s *)&g_lpwork, work, worker,
- arg, delay);
- return work_signal(LPWORK);
+ if (!delay)
+ {
+ sq_addlast((FAR sq_entry_t *)work, &g_lpwork.q);
+ nxsem_post(&g_lpwork.sem);
+ }
+ else
+ {
+ wd_start(&work->u.timer, delay, lp_work_timer_expiry,
+ (wdparm_t)work);
+ }
}
- else
#endif
- {
- return -EINVAL;
- }
+
+ leave_critical_section(flags);
+
+ return OK;
}
#endif /* CONFIG_SCHED_WORKQUEUE */
diff --git a/sched/wqueue/kwork_signal.c b/sched/wqueue/kwork_signal.c
deleted file mode 100644
index 0f2084f..0000000
--- a/sched/wqueue/kwork_signal.c
+++ /dev/null
@@ -1,111 +0,0 @@
-/****************************************************************************
- * sched/wqueue/kwork_signal.c
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The
- * ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- *
- ****************************************************************************/
-
-/****************************************************************************
- * Included Files
- ****************************************************************************/
-
-#include <nuttx/config.h>
-
-#include <signal.h>
-#include <errno.h>
-
-#include <nuttx/wqueue.h>
-#include <nuttx/signal.h>
-
-#include "wqueue/wqueue.h"
-
-#ifdef CONFIG_SCHED_WORKQUEUE
-
-/****************************************************************************
- * Public Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Name: work_signal
- *
- * Description:
- * Signal the worker thread to process the work queue now. This function
- * is used internally by the work logic but could also be used by the
- * user to force an immediate re-assessment of pending work.
- *
- * Input Parameters:
- * qid - The work queue ID
- *
- * Returned Value:
- * Zero (OK) on success, a negated errno value on failure
- *
- ****************************************************************************/
-
-int work_signal(int qid)
-{
- FAR struct kwork_wqueue_s *work;
- int threads;
- int i;
-
- /* Get the process ID of the worker thread */
-
-#ifdef CONFIG_SCHED_HPWORK
- if (qid == HPWORK)
- {
- work = (FAR struct kwork_wqueue_s *)&g_hpwork;
- threads = CONFIG_SCHED_HPNTHREADS;
- }
- else
-#endif
-#ifdef CONFIG_SCHED_LPWORK
- if (qid == LPWORK)
- {
- work = (FAR struct kwork_wqueue_s *)&g_lpwork;
- threads = CONFIG_SCHED_LPNTHREADS;
- }
- else
-#endif
- {
- return -EINVAL;
- }
-
- /* Find an IDLE worker thread */
-
- for (i = 0; i < threads; i++)
- {
- /* Is this worker thread busy? */
-
- if (!work->worker[i].busy)
- {
- /* No.. select this thread */
-
- break;
- }
- }
-
- /* If all of the IDLE threads are busy, then just return successfully */
-
- if (i >= threads)
- {
- return OK;
- }
-
- /* Otherwise, signal the first IDLE thread found */
-
- return nxsig_kill(work->worker[i].pid, SIGWORK);
-}
-
-#endif /* CONFIG_SCHED_WORKQUEUE */
diff --git a/sched/wqueue/kwork_thread.c b/sched/wqueue/kwork_thread.c
index 3c3b7eb..2e48399 100644
--- a/sched/wqueue/kwork_thread.c
+++ b/sched/wqueue/kwork_thread.c
@@ -36,12 +36,37 @@
#include <nuttx/wqueue.h>
#include <nuttx/kthread.h>
+#include <nuttx/semaphore.h>
#include "wqueue/wqueue.h"
#if defined(CONFIG_SCHED_WORKQUEUE)
/****************************************************************************
+ * Pre-processor Definitions
+ ****************************************************************************/
+
+#if defined(CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE) && CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE > 0
+# define CALL_WORKER(worker, arg) \
+ do \
+ { \
+ uint32_t start; \
+ uint32_t elapsed; \
+ start = up_critmon_gettime(); \
+ worker(arg); \
+ elapsed = up_critmon_gettime() - start; \
+ if (elapsed > CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE) \
+ { \
+ serr("WORKER %p execute too long %"PRIu32"\n", \
+ worker, elapsed); \
+ } \
+ } \
+ while (0)
+#else
+# define CALL_WORKER(worker, arg) worker(arg)
+#endif
+
+/****************************************************************************
* Public Data
****************************************************************************/
@@ -65,45 +90,84 @@ struct lp_wqueue_s g_lpwork;
* Name: work_thread
*
* Description:
- * These are the worker threads that performs the actions placed on the
+ * These are the worker threads that perform the actions placed on the
* high priority work queue.
*
* These, along with the lower priority worker thread(s) are the kernel
- * mode work queues (also build in the flat build).
+ * mode work queues (also built in the flat build).
*
* All kernel mode worker threads are started by the OS during normal
* bring up. This entry point is referenced by OS internally and should
* not be accessed by application logic.
*
* Input Parameters:
- * argc, argv (not used)
+ * argc, argv
*
* Returned Value:
* Does not return
*
****************************************************************************/
-static int work_thread(int argc, char *argv[])
+static int work_thread(int argc, FAR char *argv[])
{
- FAR struct kwork_wqueue_s *queue;
- int wndx;
+ FAR struct kwork_wqueue_s *wqueue;
+ FAR struct work_s *work;
+ worker_t worker;
+ irqstate_t flags;
+ FAR void *arg;
- queue = (FAR struct kwork_wqueue_s *)
+ wqueue = (FAR struct kwork_wqueue_s *)
((uintptr_t)strtoul(argv[1], NULL, 0));
- wndx = atoi(argv[2]);
+
+ flags = enter_critical_section();
/* Loop forever */
for (; ; )
{
/* Then process queued work. work_process will not return until: (1)
- * there is no further work in the work queue, and (2) signal is
- * triggered, or delayed work expires.
+ * there is no further work in the work queue, and (2) semaphore is
+ * posted.
+ */
+
+ nxsem_wait_uninterruptible(&wqueue->sem);
+
+ /* And check each entry in the work queue. Since we have disabled
+ * interrupts we know: (1) we will not be suspended unless we do
+ * so ourselves, and (2) there will be no changes to the work queue
*/
- work_process(queue, wndx);
+ /* Remove the ready-to-execute work from the list */
+
+ work = (FAR struct work_s *)sq_remfirst(&wqueue->q);
+ if (work && work->worker)
+ {
+ /* Extract the work description from the entry (in case the work
+ * instance will be re-used after it has been de-queued).
+ */
+
+ worker = work->worker;
+
+ /* Extract the work argument (before re-enabling interrupts) */
+
+ arg = work->arg;
+
+ /* Mark the work as no longer being queued */
+
+ work->worker = NULL;
+
+ /* Do the work. Re-enable interrupts while the work is being
+ * performed... we don't have any idea how long this will take!
+ */
+
+ leave_critical_section(flags);
+ CALL_WORKER(worker, arg);
+ flags = enter_critical_section();
+ }
}
+ leave_critical_section(flags);
+
return OK; /* To keep some compilers happy */
}
@@ -130,14 +194,17 @@ static int work_thread_create(FAR const char *name, int priority,
int stack_size, int nthread,
FAR struct kwork_wqueue_s *wqueue)
{
- FAR char *argv[3];
- char args[2][16];
+ FAR char *argv[2];
+ char args[16];
int wndx;
int pid;
- snprintf(args[0], 16, "0x%" PRIxPTR, (uintptr_t)wqueue);
- argv[0] = args[0];
- argv[2] = NULL;
+ snprintf(args, 16, "0x%" PRIxPTR, (uintptr_t)wqueue);
+ argv[0] = args;
+ argv[1] = NULL;
+
+ nxsem_init(&wqueue->sem, 0, 0);
+ nxsem_set_protocol(&wqueue->sem, SEM_PRIO_NONE);
/* Don't permit any of the threads to run until we have fully initialized
* g_hpwork and g_lpwork.
@@ -147,9 +214,6 @@ static int work_thread_create(FAR const char *name, int priority,
for (wndx = 0; wndx < nthread; wndx++)
{
- snprintf(args[1], 16, "%d", wndx);
- argv[1] = args[1];
-
pid = kthread_create(name, priority, stack_size,
(main_t)work_thread, argv);
@@ -164,7 +228,6 @@ static int work_thread_create(FAR const char *name, int priority,
#ifdef CONFIG_PRIORITY_INHERITANCE
wqueue->worker[wndx].pid = pid;
#endif
- wqueue->worker[wndx].busy = true;
}
sched_unlock();
diff --git a/sched/wqueue/wqueue.h b/sched/wqueue/wqueue.h
index 106cb2d..774cebc 100644
--- a/sched/wqueue/wqueue.h
+++ b/sched/wqueue/wqueue.h
@@ -51,15 +51,17 @@
struct kworker_s
{
- pid_t pid; /* The task ID of the worker thread */
- volatile bool busy; /* True: Worker is not available */
+#ifdef CONFIG_PRIORITY_INHERITANCE
+ pid_t pid; /* The task ID of the worker thread */
+#endif
};
/* This structure defines the state of one kernel-mode work queue */
struct kwork_wqueue_s
{
- struct dq_queue_s q; /* The queue of pending work */
+ struct sq_queue_s q; /* The queue of pending work */
+ sem_t sem; /* The counting semaphore of the wqueue */
struct kworker_s worker[1]; /* Describes a worker thread */
};
@@ -70,7 +72,8 @@ struct kwork_wqueue_s
#ifdef CONFIG_SCHED_HPWORK
struct hp_wqueue_s
{
- struct dq_queue_s q; /* The queue of pending work */
+ struct sq_queue_s q; /* The queue of pending work */
+ sem_t sem; /* The counting semaphore of the wqueue */
/* Describes each thread in the high priority queue's thread pool */
@@ -85,7 +88,8 @@ struct hp_wqueue_s
#ifdef CONFIG_SCHED_LPWORK
struct lp_wqueue_s
{
- struct dq_queue_s q; /* The queue of pending work */
+ struct sq_queue_s q; /* The queue of pending work */
+ sem_t sem; /* The counting semaphore of the wqueue */
/* Describes each thread in the low priority queue's thread pool */
@@ -152,26 +156,6 @@ int work_start_lowpri(void);
#endif
/****************************************************************************
- * Name: work_process
- *
- * Description:
- * This is the logic that performs actions placed on any work list. This
- * logic is the common underlying logic to all work queues. This logic is
- * part of the internal implementation of each work queue; it should not
- * be called from application level logic.
- *
- * Input Parameters:
- * wqueue - Describes the work queue to be processed
- * wndx - The worker thread index
- *
- * Returned Value:
- * None
- *
- ****************************************************************************/
-
-void work_process(FAR struct kwork_wqueue_s *wqueue, int wndx);
-
-/****************************************************************************
* Name: work_initialize_notifier
*
* Description: